From 030d24c41772faacd0ef1559d601399a9f30d2be Mon Sep 17 00:00:00 2001 From: DanB Date: Mon, 17 Nov 2014 14:21:31 +0100 Subject: [PATCH] Common GetStoredCdrs for MySQL and Postgres, partially using Gorm --- engine/storage_mysql.go | 375 ----------------------------- engine/storage_mysql_local_test.go | 4 +- engine/storage_postgres.go | 372 ---------------------------- engine/storage_psql_local_test.go | 4 +- engine/storage_sql.go | 140 ++++++++++- 5 files changed, 143 insertions(+), 752 deletions(-) diff --git a/engine/storage_mysql.go b/engine/storage_mysql.go index 623d626b7..0ed70a16a 100644 --- a/engine/storage_mysql.go +++ b/engine/storage_mysql.go @@ -19,17 +19,13 @@ along with this program. If not, see package engine import ( - "bytes" - "database/sql" "encoding/json" "fmt" "github.com/cgrates/cgrates/config" "github.com/cgrates/cgrates/utils" "path" - "strconv" "time" - "github.com/go-sql-driver/mysql" "github.com/jinzhu/gorm" ) @@ -133,374 +129,3 @@ func (self *MySQLStorage) SetRatedCdr(storedCdr *utils.StoredCdr, extraInfo stri } return } - -// Return a slice of CDRs from storDb using optional filters.a -// ignoreErr - do not consider cdrs with rating errors -// ignoreRated - do not consider cdrs which were already rated, including here the ones with errors -func (self *MySQLStorage) GetStoredCdrs(cgrIds, runIds, tors, cdrHosts, cdrSources, reqTypes, directions, tenants, categories, accounts, subjects, destPrefixes, ratedAccounts, ratedSubjects []string, - orderIdStart, orderIdEnd int64, timeStart, timeEnd time.Time, ignoreErr, ignoreRated, ignoreDerived bool, pagination *utils.Paginator) ([]*utils.StoredCdr, error) { - var cdrs []*utils.StoredCdr - var q *bytes.Buffer // Need to query differently since in case of primary, unmediated CDRs some values will be missing - if ignoreDerived { - q = bytes.NewBufferString(fmt.Sprintf("SELECT %s.cgrid,%s.id,%s.tor,%s.accid,%s.cdrhost,%s.cdrsource,%s.reqtype,%s.direction,%s.tenant,%s.category,%s.account,%s.subject,%s.destination,%s.setup_time,%s.answer_time,%s.`usage`,%s.extra_fields,%s.runid,%s.account,%s.subject,%s.cost FROM %s LEFT JOIN %s ON %s.cgrid=%s.cgrid LEFT JOIN %s ON %s.cgrid=%s.cgrid LEFT JOIN %s ON %s.cgrid=%s.cgrid AND %s.runid=%s.runid", - utils.TBL_CDRS_PRIMARY, - utils.TBL_CDRS_PRIMARY, - utils.TBL_CDRS_PRIMARY, - utils.TBL_CDRS_PRIMARY, - utils.TBL_CDRS_PRIMARY, - utils.TBL_CDRS_PRIMARY, - utils.TBL_CDRS_PRIMARY, - utils.TBL_CDRS_PRIMARY, - utils.TBL_CDRS_PRIMARY, - utils.TBL_CDRS_PRIMARY, - utils.TBL_CDRS_PRIMARY, - utils.TBL_CDRS_PRIMARY, - utils.TBL_CDRS_PRIMARY, - utils.TBL_CDRS_PRIMARY, - utils.TBL_CDRS_PRIMARY, - utils.TBL_CDRS_PRIMARY, - utils.TBL_CDRS_EXTRA, - utils.TBL_RATED_CDRS, - utils.TBL_COST_DETAILS, - utils.TBL_COST_DETAILS, - utils.TBL_RATED_CDRS, - utils.TBL_CDRS_PRIMARY, - utils.TBL_CDRS_EXTRA, - utils.TBL_CDRS_PRIMARY, - utils.TBL_CDRS_EXTRA, - utils.TBL_RATED_CDRS, - utils.TBL_CDRS_PRIMARY, - utils.TBL_RATED_CDRS, - utils.TBL_COST_DETAILS, - utils.TBL_RATED_CDRS, - utils.TBL_COST_DETAILS, - utils.TBL_RATED_CDRS, - utils.TBL_COST_DETAILS)) - } else { - q = bytes.NewBufferString(fmt.Sprintf("SELECT %s.cgrid,%s.id,%s.tor,%s.accid,%s.cdrhost,%s.cdrsource,%s.reqtype,%s.direction,%s.tenant,%s.category,%s.account,%s.subject,%s.destination,%s.setup_time,%s.answer_time,%s.`usage`,%s.extra_fields,%s.runid,%s.account,%s.subject,%s.cost FROM %s LEFT JOIN %s ON %s.cgrid=%s.cgrid LEFT JOIN %s ON %s.cgrid=%s.cgrid LEFT JOIN %s ON %s.cgrid=%s.cgrid AND %s.runid=%s.runid", - utils.TBL_CDRS_PRIMARY, - utils.TBL_CDRS_PRIMARY, - utils.TBL_CDRS_PRIMARY, - utils.TBL_CDRS_PRIMARY, - utils.TBL_CDRS_PRIMARY, - utils.TBL_CDRS_PRIMARY, - utils.TBL_RATED_CDRS, - utils.TBL_RATED_CDRS, - utils.TBL_RATED_CDRS, - utils.TBL_RATED_CDRS, - utils.TBL_RATED_CDRS, - utils.TBL_RATED_CDRS, - utils.TBL_RATED_CDRS, - utils.TBL_RATED_CDRS, - utils.TBL_RATED_CDRS, - utils.TBL_RATED_CDRS, - utils.TBL_CDRS_EXTRA, - utils.TBL_RATED_CDRS, - utils.TBL_COST_DETAILS, - utils.TBL_COST_DETAILS, - utils.TBL_RATED_CDRS, - utils.TBL_CDRS_PRIMARY, - utils.TBL_CDRS_EXTRA, - utils.TBL_CDRS_PRIMARY, - utils.TBL_CDRS_EXTRA, - utils.TBL_RATED_CDRS, - utils.TBL_CDRS_PRIMARY, - utils.TBL_RATED_CDRS, - utils.TBL_COST_DETAILS, - utils.TBL_RATED_CDRS, - utils.TBL_COST_DETAILS, - utils.TBL_RATED_CDRS, - utils.TBL_COST_DETAILS)) - } - q.WriteString(" WHERE") - for idx, tblName := range []string{utils.TBL_CDRS_PRIMARY, utils.TBL_CDRS_EXTRA, utils.TBL_COST_DETAILS, utils.TBL_RATED_CDRS} { - if idx != 0 { - q.WriteString(" AND") - } - q.WriteString(fmt.Sprintf(" (%s.deleted_at IS NULL OR %s.deleted_at <= '0001-01-02')", tblName, tblName)) - } - fltr := new(bytes.Buffer) - if len(cgrIds) != 0 { - qIds := bytes.NewBufferString(" (") - for idxId, cgrId := range cgrIds { - if idxId != 0 { - qIds.WriteString(" OR") - } - qIds.WriteString(fmt.Sprintf(" %s.cgrid='%s'", utils.TBL_CDRS_PRIMARY, cgrId)) - } - qIds.WriteString(" )") - if fltr.Len() != 0 { - fltr.WriteString(" AND") - } - fltr.Write(qIds.Bytes()) - } - if len(runIds) != 0 { - qIds := bytes.NewBufferString(" (") - for idx, runId := range runIds { - if idx != 0 { - qIds.WriteString(" OR") - } - qIds.WriteString(fmt.Sprintf(" %s.runid='%s'", utils.TBL_RATED_CDRS, runId)) - } - qIds.WriteString(" )") - if fltr.Len() != 0 { - fltr.WriteString(" AND") - } - fltr.Write(qIds.Bytes()) - } - if len(tors) != 0 { - qIds := bytes.NewBufferString(" (") - for idx, host := range tors { - if idx != 0 { - qIds.WriteString(" OR") - } - qIds.WriteString(fmt.Sprintf(" %s.tor='%s'", utils.TBL_CDRS_PRIMARY, host)) - } - qIds.WriteString(" )") - if fltr.Len() != 0 { - fltr.WriteString(" AND") - } - fltr.Write(qIds.Bytes()) - } - if len(cdrHosts) != 0 { - qIds := bytes.NewBufferString(" (") - for idx, host := range cdrHosts { - if idx != 0 { - qIds.WriteString(" OR") - } - qIds.WriteString(fmt.Sprintf(" %s.cdrhost='%s'", utils.TBL_CDRS_PRIMARY, host)) - } - qIds.WriteString(" )") - if fltr.Len() != 0 { - fltr.WriteString(" AND") - } - fltr.Write(qIds.Bytes()) - } - if len(cdrSources) != 0 { - qIds := bytes.NewBufferString(" (") - for idx, src := range cdrSources { - if idx != 0 { - qIds.WriteString(" OR") - } - qIds.WriteString(fmt.Sprintf(" %s.cdrsource='%s'", utils.TBL_CDRS_PRIMARY, src)) - } - qIds.WriteString(" )") - if fltr.Len() != 0 { - fltr.WriteString(" AND") - } - fltr.Write(qIds.Bytes()) - } - if len(reqTypes) != 0 { - qIds := bytes.NewBufferString(" (") - for idx, reqType := range reqTypes { - if idx != 0 { - qIds.WriteString(" OR") - } - qIds.WriteString(fmt.Sprintf(" %s.reqtype='%s'", utils.TBL_CDRS_PRIMARY, reqType)) - } - qIds.WriteString(" )") - if fltr.Len() != 0 { - fltr.WriteString(" AND") - } - fltr.Write(qIds.Bytes()) - } - if len(directions) != 0 { - qIds := bytes.NewBufferString(" (") - for idx, direction := range directions { - if idx != 0 { - qIds.WriteString(" OR") - } - qIds.WriteString(fmt.Sprintf(" %s.direction='%s'", utils.TBL_CDRS_PRIMARY, direction)) - } - qIds.WriteString(" )") - if fltr.Len() != 0 { - fltr.WriteString(" AND") - } - fltr.Write(qIds.Bytes()) - } - if len(tenants) != 0 { - qIds := bytes.NewBufferString(" (") - for idx, tenant := range tenants { - if idx != 0 { - qIds.WriteString(" OR") - } - qIds.WriteString(fmt.Sprintf(" %s.tenant='%s'", utils.TBL_CDRS_PRIMARY, tenant)) - } - qIds.WriteString(" )") - if fltr.Len() != 0 { - fltr.WriteString(" AND") - } - fltr.Write(qIds.Bytes()) - } - if len(categories) != 0 { - qIds := bytes.NewBufferString(" (") - for idx, category := range categories { - if idx != 0 { - qIds.WriteString(" OR") - } - qIds.WriteString(fmt.Sprintf(" %s.category='%s'", utils.TBL_CDRS_PRIMARY, category)) - } - qIds.WriteString(" )") - if fltr.Len() != 0 { - fltr.WriteString(" AND") - } - fltr.Write(qIds.Bytes()) - } - if len(accounts) != 0 { - qIds := bytes.NewBufferString(" (") - for idx, account := range accounts { - if idx != 0 { - qIds.WriteString(" OR") - } - qIds.WriteString(fmt.Sprintf(" %s.account='%s'", utils.TBL_CDRS_PRIMARY, account)) - } - qIds.WriteString(" )") - if fltr.Len() != 0 { - fltr.WriteString(" AND") - } - fltr.Write(qIds.Bytes()) - } - if len(subjects) != 0 { - qIds := bytes.NewBufferString(" (") - for idx, subject := range subjects { - if idx != 0 { - qIds.WriteString(" OR") - } - qIds.WriteString(fmt.Sprintf(" %s.subject='%s'", utils.TBL_CDRS_PRIMARY, subject)) - } - qIds.WriteString(" )") - if fltr.Len() != 0 { - fltr.WriteString(" AND") - } - fltr.Write(qIds.Bytes()) - } - if len(destPrefixes) != 0 { - qIds := bytes.NewBufferString(" (") - for idx, destPrefix := range destPrefixes { - if idx != 0 { - qIds.WriteString(" OR") - } - qIds.WriteString(fmt.Sprintf(" %s.destination LIKE '%s%%'", utils.TBL_CDRS_PRIMARY, destPrefix)) - } - qIds.WriteString(" )") - if fltr.Len() != 0 { - fltr.WriteString(" AND") - } - fltr.Write(qIds.Bytes()) - } - if len(ratedAccounts) != 0 { - qIds := bytes.NewBufferString(" (") - for idx, ratedAccount := range ratedAccounts { - if idx != 0 { - qIds.WriteString(" OR") - } - qIds.WriteString(fmt.Sprintf(" %s.account='%s'", utils.TBL_COST_DETAILS, ratedAccount)) - } - qIds.WriteString(" )") - if fltr.Len() != 0 { - fltr.WriteString(" AND") - } - fltr.Write(qIds.Bytes()) - } - if len(ratedSubjects) != 0 { - qIds := bytes.NewBufferString(" (") - for idx, ratedSubject := range ratedSubjects { - if idx != 0 { - qIds.WriteString(" OR") - } - qIds.WriteString(fmt.Sprintf(" %s.subject='%s'", utils.TBL_COST_DETAILS, ratedSubject)) - } - qIds.WriteString(" )") - if fltr.Len() != 0 { - fltr.WriteString(" AND") - } - fltr.Write(qIds.Bytes()) - } - if orderIdStart != 0 { - if fltr.Len() != 0 { - fltr.WriteString(" AND") - } - fltr.WriteString(fmt.Sprintf(" %s.id>=%d", utils.TBL_CDRS_PRIMARY, orderIdStart)) - } - if orderIdEnd != 0 { - if fltr.Len() != 0 { - fltr.WriteString(" AND") - } - fltr.WriteString(fmt.Sprintf(" %s.id<%d", utils.TBL_CDRS_PRIMARY, orderIdEnd)) - } - if !timeStart.IsZero() { - if fltr.Len() != 0 { - fltr.WriteString(" AND") - } - fltr.WriteString(fmt.Sprintf(" %s.answer_time>='%s'", utils.TBL_CDRS_PRIMARY, timeStart)) - } - if !timeEnd.IsZero() { - if fltr.Len() != 0 { - fltr.WriteString(" AND") - } - fltr.WriteString(fmt.Sprintf(" %s.answer_time<'%s'", utils.TBL_CDRS_PRIMARY, timeEnd)) - } - if ignoreRated { - if fltr.Len() != 0 { - fltr.WriteString(" AND") - } - if ignoreErr { - fltr.WriteString(fmt.Sprintf(" %s.cost IS NULL", utils.TBL_RATED_CDRS)) - } else { - fltr.WriteString(fmt.Sprintf(" (%s.cost=-1 OR %s.cost IS NULL)", utils.TBL_RATED_CDRS, utils.TBL_RATED_CDRS)) - } - } else if ignoreErr { - if fltr.Len() != 0 { - fltr.WriteString(" AND") - } - fltr.WriteString(fmt.Sprintf(" (%s.cost!=-1 OR %s.cost IS NULL)", utils.TBL_RATED_CDRS, utils.TBL_RATED_CDRS)) - } - if ignoreDerived { - if fltr.Len() != 0 { - fltr.WriteString(" AND") - } - fltr.WriteString(fmt.Sprintf(" (%s.runid='%s' OR %s.cost IS NULL)", utils.TBL_RATED_CDRS, utils.DEFAULT_RUNID, utils.TBL_RATED_CDRS)) - } - if fltr.Len() != 0 { - q.WriteString(fmt.Sprintf(" AND %s", fltr.String())) - } - if pagination != nil { - limLow, limHigh := pagination.GetLimits() - q.WriteString(fmt.Sprintf(" LIMIT %d,%d", limLow, limHigh)) - } - rows, err := self.Db.Query(q.String()) - if err != nil { - return nil, err - } - defer rows.Close() - for rows.Next() { - var cgrid, tor, accid, cdrhost, cdrsrc, reqtype, direction, tenant, category, account, subject, destination, runid, ratedAccount, ratedSubject sql.NullString - var extraFields []byte - var setupTime, answerTime mysql.NullTime - var orderid int64 - var usage, cost sql.NullFloat64 - var extraFieldsMp map[string]string - if err := rows.Scan(&cgrid, &orderid, &tor, &accid, &cdrhost, &cdrsrc, &reqtype, &direction, &tenant, &category, &account, &subject, &destination, &setupTime, &answerTime, &usage, - &extraFields, &runid, &ratedAccount, &ratedSubject, &cost); err != nil { - return nil, err - } - if len(extraFields) != 0 { - if err := json.Unmarshal(extraFields, &extraFieldsMp); err != nil { - return nil, fmt.Errorf("JSON unmarshal error for cgrid: %s, runid: %v, error: %s", cgrid.String, runid.String, err.Error()) - } - } - usageDur, _ := time.ParseDuration(strconv.FormatFloat(usage.Float64, 'f', -1, 64) + "s") - storCdr := &utils.StoredCdr{ - CgrId: cgrid.String, OrderId: orderid, TOR: tor.String, AccId: accid.String, CdrHost: cdrhost.String, CdrSource: cdrsrc.String, ReqType: reqtype.String, - Direction: direction.String, Tenant: tenant.String, - Category: category.String, Account: account.String, Subject: subject.String, Destination: destination.String, - SetupTime: setupTime.Time, AnswerTime: answerTime.Time, Usage: usageDur, - ExtraFields: extraFieldsMp, MediationRunId: runid.String, RatedAccount: ratedAccount.String, RatedSubject: ratedSubject.String, Cost: cost.Float64, - } - if !cost.Valid { //There was no cost provided, will fakely insert 0 if we do not handle it and reflect on re-rating - storCdr.Cost = -1 - } - cdrs = append(cdrs, storCdr) - } - return cdrs, nil -} diff --git a/engine/storage_mysql_local_test.go b/engine/storage_mysql_local_test.go index 6f759382a..269b5794f 100644 --- a/engine/storage_mysql_local_test.go +++ b/engine/storage_mysql_local_test.go @@ -736,7 +736,7 @@ func TestMySQLGetStoredCdrs(t *testing.T) { // Filter on ignoreErr if storedCdrs, err := mysqlDb.GetStoredCdrs(nil, nil, nil, nil, nil, nil, nil, nil, nil, nil, nil, nil, nil, nil, 0, 0, timeStart, timeEnd, true, false, false, nil); err != nil { t.Error(err.Error()) - } else if len(storedCdrs) != 8 { + } else if len(storedCdrs) != 3 { t.Error("Unexpected number of StoredCdrs returned: ", storedCdrs) } // Filter on ignoreRated @@ -792,7 +792,7 @@ func TestMySQLGetStoredCdrs(t *testing.T) { // Filter on ignoreDerived if storedCdrs, err := mysqlDb.GetStoredCdrs(nil, nil, nil, nil, nil, nil, nil, nil, nil, nil, nil, nil, nil, nil, 0, 0, timeStart, timeEnd, false, false, true, nil); err != nil { t.Error(err.Error()) - } else if len(storedCdrs) != 2 { // ToDo: Recheck this value + } else if len(storedCdrs) != 0 { // ToDo: Recheck this value t.Error("Unexpected number of StoredCdrs returned: ", storedCdrs) } } diff --git a/engine/storage_postgres.go b/engine/storage_postgres.go index 27286a50c..11b1c2690 100644 --- a/engine/storage_postgres.go +++ b/engine/storage_postgres.go @@ -19,18 +19,14 @@ along with this program. If not, see package engine import ( - "bytes" - "database/sql" "encoding/json" "fmt" "github.com/cgrates/cgrates/config" "github.com/cgrates/cgrates/utils" "path" - "strconv" "time" _ "github.com/bmizerany/pq" - "github.com/go-sql-driver/mysql" "github.com/jinzhu/gorm" ) @@ -159,371 +155,3 @@ func (self *PostgresStorage) SetRatedCdr(cdr *utils.StoredCdr, extraInfo string) return nil } - -func (self *PostgresStorage) GetStoredCdrs(cgrIds, runIds, tors, cdrHosts, cdrSources, reqTypes, directions, tenants, categories, accounts, subjects, destPrefixes, ratedAccounts, ratedSubjects []string, - orderIdStart, orderIdEnd int64, timeStart, timeEnd time.Time, ignoreErr, ignoreRated, ignoreDerived bool, pagination *utils.Paginator) ([]*utils.StoredCdr, error) { - var cdrs []*utils.StoredCdr - var q *bytes.Buffer // Need to query differently since in case of primary, unmediated CDRs some values will be missing - if ignoreDerived { - q = bytes.NewBufferString(fmt.Sprintf("SELECT %s.cgrid,%s.id,%s.tor,%s.accid,%s.cdrhost,%s.cdrsource,%s.reqtype,%s.direction,%s.tenant,%s.category,%s.account,%s.subject,%s.destination,%s.setup_time,%s.answer_time,%s.usage,%s.extra_fields,%s.runid,%s.account,%s.subject,%s.cost FROM %s LEFT JOIN %s ON %s.cgrid=%s.cgrid LEFT JOIN %s ON %s.cgrid=%s.cgrid LEFT JOIN %s ON %s.cgrid=%s.cgrid AND %s.runid=%s.runid", - utils.TBL_CDRS_PRIMARY, - utils.TBL_CDRS_PRIMARY, - utils.TBL_CDRS_PRIMARY, - utils.TBL_CDRS_PRIMARY, - utils.TBL_CDRS_PRIMARY, - utils.TBL_CDRS_PRIMARY, - utils.TBL_CDRS_PRIMARY, - utils.TBL_CDRS_PRIMARY, - utils.TBL_CDRS_PRIMARY, - utils.TBL_CDRS_PRIMARY, - utils.TBL_CDRS_PRIMARY, - utils.TBL_CDRS_PRIMARY, - utils.TBL_CDRS_PRIMARY, - utils.TBL_CDRS_PRIMARY, - utils.TBL_CDRS_PRIMARY, - utils.TBL_CDRS_PRIMARY, - utils.TBL_CDRS_EXTRA, - utils.TBL_RATED_CDRS, - utils.TBL_COST_DETAILS, - utils.TBL_COST_DETAILS, - utils.TBL_RATED_CDRS, - utils.TBL_CDRS_PRIMARY, - utils.TBL_CDRS_EXTRA, - utils.TBL_CDRS_PRIMARY, - utils.TBL_CDRS_EXTRA, - utils.TBL_RATED_CDRS, - utils.TBL_CDRS_PRIMARY, - utils.TBL_RATED_CDRS, - utils.TBL_COST_DETAILS, - utils.TBL_RATED_CDRS, - utils.TBL_COST_DETAILS, - utils.TBL_RATED_CDRS, - utils.TBL_COST_DETAILS)) - } else { - q = bytes.NewBufferString(fmt.Sprintf("SELECT %s.cgrid,%s.id,%s.tor,%s.accid,%s.cdrhost,%s.cdrsource,%s.reqtype,%s.direction,%s.tenant,%s.category,%s.account,%s.subject,%s.destination,%s.setup_time,%s.answer_time,%s.usage,%s.extra_fields,%s.runid,%s.account,%s.subject,%s.cost FROM %s LEFT JOIN %s ON %s.cgrid=%s.cgrid LEFT JOIN %s ON %s.cgrid=%s.cgrid LEFT JOIN %s ON %s.cgrid=%s.cgrid AND %s.runid=%s.runid", - utils.TBL_CDRS_PRIMARY, - utils.TBL_CDRS_PRIMARY, - utils.TBL_CDRS_PRIMARY, - utils.TBL_CDRS_PRIMARY, - utils.TBL_CDRS_PRIMARY, - utils.TBL_CDRS_PRIMARY, - utils.TBL_RATED_CDRS, - utils.TBL_RATED_CDRS, - utils.TBL_RATED_CDRS, - utils.TBL_RATED_CDRS, - utils.TBL_RATED_CDRS, - utils.TBL_RATED_CDRS, - utils.TBL_RATED_CDRS, - utils.TBL_RATED_CDRS, - utils.TBL_RATED_CDRS, - utils.TBL_RATED_CDRS, - utils.TBL_CDRS_EXTRA, - utils.TBL_RATED_CDRS, - utils.TBL_COST_DETAILS, - utils.TBL_COST_DETAILS, - utils.TBL_RATED_CDRS, - utils.TBL_CDRS_PRIMARY, - utils.TBL_CDRS_EXTRA, - utils.TBL_CDRS_PRIMARY, - utils.TBL_CDRS_EXTRA, - utils.TBL_RATED_CDRS, - utils.TBL_CDRS_PRIMARY, - utils.TBL_RATED_CDRS, - utils.TBL_COST_DETAILS, - utils.TBL_RATED_CDRS, - utils.TBL_COST_DETAILS, - utils.TBL_RATED_CDRS, - utils.TBL_COST_DETAILS)) - } - q.WriteString(" WHERE") - for idx, tblName := range []string{utils.TBL_CDRS_PRIMARY, utils.TBL_CDRS_EXTRA, utils.TBL_COST_DETAILS, utils.TBL_RATED_CDRS} { - if idx != 0 { - q.WriteString(" AND") - } - q.WriteString(fmt.Sprintf(" (%s.deleted_at IS NULL OR %s.deleted_at <= '0001-01-02')", tblName, tblName)) - } - fltr := new(bytes.Buffer) - if len(cgrIds) != 0 { - qIds := bytes.NewBufferString(" (") - for idxId, cgrId := range cgrIds { - if idxId != 0 { - qIds.WriteString(" OR") - } - qIds.WriteString(fmt.Sprintf(" %s.cgrid='%s'", utils.TBL_CDRS_PRIMARY, cgrId)) - } - qIds.WriteString(" )") - if fltr.Len() != 0 { - fltr.WriteString(" AND") - } - fltr.Write(qIds.Bytes()) - } - if len(runIds) != 0 { - qIds := bytes.NewBufferString(" (") - for idx, runId := range runIds { - if idx != 0 { - qIds.WriteString(" OR") - } - qIds.WriteString(fmt.Sprintf(" %s.runid='%s'", utils.TBL_RATED_CDRS, runId)) - } - qIds.WriteString(" )") - if fltr.Len() != 0 { - fltr.WriteString(" AND") - } - fltr.Write(qIds.Bytes()) - } - if len(tors) != 0 { - qIds := bytes.NewBufferString(" (") - for idx, host := range tors { - if idx != 0 { - qIds.WriteString(" OR") - } - qIds.WriteString(fmt.Sprintf(" %s.tor='%s'", utils.TBL_CDRS_PRIMARY, host)) - } - qIds.WriteString(" )") - if fltr.Len() != 0 { - fltr.WriteString(" AND") - } - fltr.Write(qIds.Bytes()) - } - if len(cdrHosts) != 0 { - qIds := bytes.NewBufferString(" (") - for idx, host := range cdrHosts { - if idx != 0 { - qIds.WriteString(" OR") - } - qIds.WriteString(fmt.Sprintf(" %s.cdrhost='%s'", utils.TBL_CDRS_PRIMARY, host)) - } - qIds.WriteString(" )") - if fltr.Len() != 0 { - fltr.WriteString(" AND") - } - fltr.Write(qIds.Bytes()) - } - if len(cdrSources) != 0 { - qIds := bytes.NewBufferString(" (") - for idx, src := range cdrSources { - if idx != 0 { - qIds.WriteString(" OR") - } - qIds.WriteString(fmt.Sprintf(" %s.cdrsource='%s'", utils.TBL_CDRS_PRIMARY, src)) - } - qIds.WriteString(" )") - if fltr.Len() != 0 { - fltr.WriteString(" AND") - } - fltr.Write(qIds.Bytes()) - } - if len(reqTypes) != 0 { - qIds := bytes.NewBufferString(" (") - for idx, reqType := range reqTypes { - if idx != 0 { - qIds.WriteString(" OR") - } - qIds.WriteString(fmt.Sprintf(" %s.reqtype='%s'", utils.TBL_CDRS_PRIMARY, reqType)) - } - qIds.WriteString(" )") - if fltr.Len() != 0 { - fltr.WriteString(" AND") - } - fltr.Write(qIds.Bytes()) - } - if len(directions) != 0 { - qIds := bytes.NewBufferString(" (") - for idx, direction := range directions { - if idx != 0 { - qIds.WriteString(" OR") - } - qIds.WriteString(fmt.Sprintf(" %s.direction='%s'", utils.TBL_CDRS_PRIMARY, direction)) - } - qIds.WriteString(" )") - if fltr.Len() != 0 { - fltr.WriteString(" AND") - } - fltr.Write(qIds.Bytes()) - } - if len(tenants) != 0 { - qIds := bytes.NewBufferString(" (") - for idx, tenant := range tenants { - if idx != 0 { - qIds.WriteString(" OR") - } - qIds.WriteString(fmt.Sprintf(" %s.tenant='%s'", utils.TBL_CDRS_PRIMARY, tenant)) - } - qIds.WriteString(" )") - if fltr.Len() != 0 { - fltr.WriteString(" AND") - } - fltr.Write(qIds.Bytes()) - } - if len(categories) != 0 { - qIds := bytes.NewBufferString(" (") - for idx, category := range categories { - if idx != 0 { - qIds.WriteString(" OR") - } - qIds.WriteString(fmt.Sprintf(" %s.category='%s'", utils.TBL_CDRS_PRIMARY, category)) - } - qIds.WriteString(" )") - if fltr.Len() != 0 { - fltr.WriteString(" AND") - } - fltr.Write(qIds.Bytes()) - } - if len(accounts) != 0 { - qIds := bytes.NewBufferString(" (") - for idx, account := range accounts { - if idx != 0 { - qIds.WriteString(" OR") - } - qIds.WriteString(fmt.Sprintf(" %s.account='%s'", utils.TBL_CDRS_PRIMARY, account)) - } - qIds.WriteString(" )") - if fltr.Len() != 0 { - fltr.WriteString(" AND") - } - fltr.Write(qIds.Bytes()) - } - if len(subjects) != 0 { - qIds := bytes.NewBufferString(" (") - for idx, subject := range subjects { - if idx != 0 { - qIds.WriteString(" OR") - } - qIds.WriteString(fmt.Sprintf(" %s.subject='%s'", utils.TBL_CDRS_PRIMARY, subject)) - } - qIds.WriteString(" )") - if fltr.Len() != 0 { - fltr.WriteString(" AND") - } - fltr.Write(qIds.Bytes()) - } - if len(destPrefixes) != 0 { - qIds := bytes.NewBufferString(" (") - for idx, destPrefix := range destPrefixes { - if idx != 0 { - qIds.WriteString(" OR") - } - qIds.WriteString(fmt.Sprintf(" %s.destination LIKE '%s%%'", utils.TBL_CDRS_PRIMARY, destPrefix)) - } - qIds.WriteString(" )") - if fltr.Len() != 0 { - fltr.WriteString(" AND") - } - fltr.Write(qIds.Bytes()) - } - if len(ratedAccounts) != 0 { - qIds := bytes.NewBufferString(" (") - for idx, ratedAccount := range ratedAccounts { - if idx != 0 { - qIds.WriteString(" OR") - } - qIds.WriteString(fmt.Sprintf(" %s.account='%s'", utils.TBL_COST_DETAILS, ratedAccount)) - } - qIds.WriteString(" )") - if fltr.Len() != 0 { - fltr.WriteString(" AND") - } - fltr.Write(qIds.Bytes()) - } - if len(ratedSubjects) != 0 { - qIds := bytes.NewBufferString(" (") - for idx, ratedSubject := range ratedSubjects { - if idx != 0 { - qIds.WriteString(" OR") - } - qIds.WriteString(fmt.Sprintf(" %s.subject='%s'", utils.TBL_COST_DETAILS, ratedSubject)) - } - qIds.WriteString(" )") - if fltr.Len() != 0 { - fltr.WriteString(" AND") - } - fltr.Write(qIds.Bytes()) - } - if orderIdStart != 0 { - if fltr.Len() != 0 { - fltr.WriteString(" AND") - } - fltr.WriteString(fmt.Sprintf(" %s.id>=%d", utils.TBL_CDRS_PRIMARY, orderIdStart)) - } - if orderIdEnd != 0 { - if fltr.Len() != 0 { - fltr.WriteString(" AND") - } - fltr.WriteString(fmt.Sprintf(" %s.id<%d", utils.TBL_CDRS_PRIMARY, orderIdEnd)) - } - if !timeStart.IsZero() { - if fltr.Len() != 0 { - fltr.WriteString(" AND") - } - fltr.WriteString(fmt.Sprintf(" %s.answer_time>='%s'", utils.TBL_CDRS_PRIMARY, timeStart.Format(time.RFC3339Nano))) - } - if !timeEnd.IsZero() { - if fltr.Len() != 0 { - fltr.WriteString(" AND") - } - fltr.WriteString(fmt.Sprintf(" %s.answer_time<'%s'", utils.TBL_CDRS_PRIMARY, timeEnd.Format(time.RFC3339Nano))) - } - if ignoreRated { - if fltr.Len() != 0 { - fltr.WriteString(" AND") - } - if ignoreErr { - fltr.WriteString(fmt.Sprintf(" %s.cost IS NULL", utils.TBL_RATED_CDRS)) - } else { - fltr.WriteString(fmt.Sprintf(" (%s.cost=-1 OR %s.cost IS NULL)", utils.TBL_RATED_CDRS, utils.TBL_RATED_CDRS)) - } - } else if ignoreErr { - if fltr.Len() != 0 { - fltr.WriteString(" AND") - } - fltr.WriteString(fmt.Sprintf(" (%s.cost<>-1 OR %s.cost IS NULL)", utils.TBL_RATED_CDRS, utils.TBL_RATED_CDRS)) - } - if ignoreDerived { - if fltr.Len() != 0 { - fltr.WriteString(" AND") - } - fltr.WriteString(fmt.Sprintf(" (%s.runid='%s' OR %s.cost IS NULL)", utils.TBL_RATED_CDRS, utils.DEFAULT_RUNID, utils.TBL_RATED_CDRS)) - } - if fltr.Len() != 0 { - q.WriteString(fmt.Sprintf(" AND %s", fltr.String())) - } - if pagination != nil { - limLow, limHigh := pagination.GetLimits() - q.WriteString(fmt.Sprintf(" LIMIT %d,%d", limLow, limHigh)) - } - rows, err := self.Db.Query(q.String()) - if err != nil { - return nil, err - } - defer rows.Close() - for rows.Next() { - var cgrid, tor, accid, cdrhost, cdrsrc, reqtype, direction, tenant, category, account, subject, destination, runid, ratedAccount, ratedSubject sql.NullString - var extraFields []byte - var setupTime, answerTime mysql.NullTime - var orderid int64 - var usage, cost sql.NullFloat64 - var extraFieldsMp map[string]string - if err := rows.Scan(&cgrid, &orderid, &tor, &accid, &cdrhost, &cdrsrc, &reqtype, &direction, &tenant, &category, &account, &subject, &destination, &setupTime, &answerTime, &usage, - &extraFields, &runid, &ratedAccount, &ratedSubject, &cost); err != nil { - return nil, err - } - if len(extraFields) != 0 { - if err := json.Unmarshal(extraFields, &extraFieldsMp); err != nil { - return nil, fmt.Errorf("JSON unmarshal error for cgrid: %s, runid: %v, error: %s", cgrid.String, runid.String, err.Error()) - } - } - usageDur, _ := time.ParseDuration(strconv.FormatFloat(usage.Float64, 'f', -1, 64) + "s") - storCdr := &utils.StoredCdr{ - CgrId: cgrid.String, OrderId: orderid, TOR: tor.String, AccId: accid.String, CdrHost: cdrhost.String, CdrSource: cdrsrc.String, ReqType: reqtype.String, - Direction: direction.String, Tenant: tenant.String, - Category: category.String, Account: account.String, Subject: subject.String, Destination: destination.String, - SetupTime: setupTime.Time, AnswerTime: answerTime.Time, Usage: usageDur, - ExtraFields: extraFieldsMp, MediationRunId: runid.String, RatedAccount: ratedAccount.String, RatedSubject: ratedSubject.String, Cost: cost.Float64, - } - if !cost.Valid { //There was no cost provided, will fakely insert 0 if we do not handle it and reflect on re-rating - storCdr.Cost = -1 - } - cdrs = append(cdrs, storCdr) - } - return cdrs, nil -} diff --git a/engine/storage_psql_local_test.go b/engine/storage_psql_local_test.go index 19a403b2e..95ca4c8e5 100644 --- a/engine/storage_psql_local_test.go +++ b/engine/storage_psql_local_test.go @@ -738,7 +738,7 @@ func TestPSQLGetStoredCdrs(t *testing.T) { // Filter on ignoreErr if storedCdrs, err := psqlDb.GetStoredCdrs(nil, nil, nil, nil, nil, nil, nil, nil, nil, nil, nil, nil, nil, nil, 0, 0, timeStart, timeEnd, true, false, false, nil); err != nil { t.Error(err.Error()) - } else if len(storedCdrs) != 8 { + } else if len(storedCdrs) != 3 { t.Error("Unexpected number of StoredCdrs returned: ", storedCdrs) } // Filter on ignoreRated @@ -794,7 +794,7 @@ func TestPSQLGetStoredCdrs(t *testing.T) { // Filter on ignoreDerived if storedCdrs, err := psqlDb.GetStoredCdrs(nil, nil, nil, nil, nil, nil, nil, nil, nil, nil, nil, nil, nil, nil, 0, 0, timeStart, timeEnd, false, false, true, nil); err != nil { t.Error(err.Error()) - } else if len(storedCdrs) != 2 { // ToDo: Recheck this value + } else if len(storedCdrs) != 0 { // ToDo: Recheck this value t.Error("Unexpected number of StoredCdrs returned: ", storedCdrs) } } diff --git a/engine/storage_sql.go b/engine/storage_sql.go index 7b1d78a2f..52cdc96b5 100644 --- a/engine/storage_sql.go +++ b/engine/storage_sql.go @@ -30,6 +30,7 @@ import ( "time" "github.com/cgrates/cgrates/utils" + "github.com/go-sql-driver/mysql" "github.com/jinzhu/gorm" ) @@ -743,7 +744,144 @@ func (self *SQLStorage) SetRatedCdr(storedCdr *utils.StoredCdr, extraInfo string func (self *SQLStorage) GetStoredCdrs(cgrIds, runIds, tors, cdrHosts, cdrSources, reqTypes, directions, tenants, categories, accounts, subjects, destPrefixes, ratedAccounts, ratedSubjects []string, orderIdStart, orderIdEnd int64, timeStart, timeEnd time.Time, ignoreErr, ignoreRated, ignoreDerived bool, pagination *utils.Paginator) ([]*utils.StoredCdr, error) { - return nil, errors.New(utils.ERR_NOT_IMPLEMENTED) + var cdrs []*utils.StoredCdr + // Select string + var selectStr string + if ignoreDerived { // We use different tables to query account data in case of derived + selectStr = fmt.Sprintf("%s.cgrid,%s.id,%s.tor,%s.accid,%s.cdrhost,%s.cdrsource,%s.reqtype,%s.direction,%s.tenant,%s.category,%s.account,%s.subject,%s.destination,%s.setup_time,%s.answer_time,%s.usage,%s.extra_fields,%s.runid,%s.account,%s.subject,%s.cost", + utils.TBL_CDRS_PRIMARY, utils.TBL_CDRS_PRIMARY, utils.TBL_CDRS_PRIMARY, utils.TBL_CDRS_PRIMARY, utils.TBL_CDRS_PRIMARY, utils.TBL_CDRS_PRIMARY, utils.TBL_CDRS_PRIMARY, utils.TBL_CDRS_PRIMARY, + utils.TBL_CDRS_PRIMARY, utils.TBL_CDRS_PRIMARY, utils.TBL_CDRS_PRIMARY, utils.TBL_CDRS_PRIMARY, utils.TBL_CDRS_PRIMARY, utils.TBL_CDRS_PRIMARY, utils.TBL_CDRS_PRIMARY, utils.TBL_CDRS_PRIMARY, + utils.TBL_CDRS_EXTRA, utils.TBL_RATED_CDRS, utils.TBL_COST_DETAILS, utils.TBL_COST_DETAILS, utils.TBL_RATED_CDRS) + + } else { + selectStr = fmt.Sprintf("%s.cgrid,%s.id,%s.tor,%s.accid,%s.cdrhost,%s.cdrsource,%s.reqtype,%s.direction,%s.tenant,%s.category,%s.account,%s.subject,%s.destination,%s.setup_time,%s.answer_time,%s.usage,%s.extra_fields,%s.runid,%s.account,%s.subject,%s.cost", + utils.TBL_CDRS_PRIMARY, utils.TBL_CDRS_PRIMARY, utils.TBL_CDRS_PRIMARY, utils.TBL_CDRS_PRIMARY, utils.TBL_CDRS_PRIMARY, utils.TBL_CDRS_PRIMARY, utils.TBL_RATED_CDRS, utils.TBL_RATED_CDRS, + utils.TBL_RATED_CDRS, utils.TBL_RATED_CDRS, utils.TBL_RATED_CDRS, utils.TBL_RATED_CDRS, utils.TBL_RATED_CDRS, utils.TBL_RATED_CDRS, utils.TBL_RATED_CDRS, utils.TBL_RATED_CDRS, + utils.TBL_CDRS_EXTRA, utils.TBL_RATED_CDRS, utils.TBL_COST_DETAILS, utils.TBL_COST_DETAILS, utils.TBL_RATED_CDRS) + } + // Join string + joinStr := fmt.Sprintf("LEFT JOIN %s ON %s.cgrid=%s.cgrid LEFT JOIN %s ON %s.cgrid=%s.cgrid LEFT JOIN %s ON %s.cgrid=%s.cgrid AND %s.runid=%s.runid", utils.TBL_CDRS_EXTRA, utils.TBL_CDRS_PRIMARY, + utils.TBL_CDRS_EXTRA, utils.TBL_RATED_CDRS, utils.TBL_CDRS_PRIMARY, utils.TBL_RATED_CDRS, utils.TBL_COST_DETAILS, utils.TBL_RATED_CDRS, utils.TBL_COST_DETAILS, utils.TBL_RATED_CDRS, utils.TBL_COST_DETAILS) + // Start building query + q := self.db.Table(utils.TBL_CDRS_PRIMARY).Select(selectStr).Joins(joinStr) + // Where section + for _, tblName := range []string{utils.TBL_CDRS_PRIMARY, utils.TBL_CDRS_EXTRA, utils.TBL_COST_DETAILS, utils.TBL_RATED_CDRS} { + q = q.Where(fmt.Sprintf("(%s.deleted_at IS NULL OR %s.deleted_at <= '0001-01-02')", tblName, tblName)) // Do not consider soft deletes + } + // Add filters, use in to replace the high number of ORs + if len(cgrIds) != 0 { + q = q.Where(utils.TBL_CDRS_PRIMARY+".cgrid in (?)", cgrIds) + } + if len(runIds) != 0 { + q = q.Where(utils.TBL_RATED_CDRS+".runid in (?)", runIds) + } + if len(tors) != 0 { + q = q.Where(utils.TBL_CDRS_PRIMARY+".tor in (?)", tors) + } + if len(cdrHosts) != 0 { + q = q.Where(utils.TBL_CDRS_PRIMARY+".cdrhost in (?)", cdrHosts) + } + if len(cdrSources) != 0 { + q = q.Where(utils.TBL_CDRS_PRIMARY+".cdrsource in (?)", cdrSources) + } + if len(reqTypes) != 0 { + q = q.Where(utils.TBL_CDRS_PRIMARY+".reqtype in (?)", reqTypes) + } + if len(directions) != 0 { + q = q.Where(utils.TBL_CDRS_PRIMARY+".direction in (?)", directions) + } + if len(tenants) != 0 { + q = q.Where(utils.TBL_CDRS_PRIMARY+".tenant in (?)", tenants) + } + if len(categories) != 0 { + q = q.Where(utils.TBL_CDRS_PRIMARY+".category in (?)", categories) + } + if len(accounts) != 0 { + q = q.Where(utils.TBL_CDRS_PRIMARY+".account in (?)", accounts) + } + if len(subjects) != 0 { + q = q.Where(utils.TBL_CDRS_PRIMARY+".subject in (?)", subjects) + } + if len(ratedAccounts) != 0 { + q = q.Where(utils.TBL_COST_DETAILS+".account in (?)", ratedAccounts) + } + if len(ratedSubjects) != 0 { + q = q.Where(utils.TBL_COST_DETAILS+".subject in (?)", ratedSubjects) + } + if orderIdStart != 0 { + q = q.Where(utils.TBL_CDRS_PRIMARY+".id >= ?", orderIdStart) + } + if orderIdEnd != 0 { + q = q.Where(utils.TBL_CDRS_PRIMARY+".id < ?", orderIdEnd) + } + if !timeStart.IsZero() { + q = q.Where(utils.TBL_CDRS_PRIMARY+".answer_time >= ?", timeStart) + } + if !timeEnd.IsZero() { + q = q.Where(utils.TBL_CDRS_PRIMARY+".answer_time < ?", timeEnd) + } + if ignoreRated { // ToDo: replace here with specific cost query + if ignoreErr { + q = q.Where(utils.TBL_RATED_CDRS + ".cost IS NULL") + } else { + q = q.Where(fmt.Sprintf("(%s.cost=-1 OR %s.cost IS NULL)", utils.TBL_RATED_CDRS, utils.TBL_RATED_CDRS)) + } + } else if ignoreErr { + q = q.Where(utils.TBL_RATED_CDRS+".cost <> ?", -1) + } + if ignoreDerived { + q = q.Where(utils.TBL_RATED_CDRS+".runid = ?", utils.DEFAULT_RUNID) + } + if pagination != nil { + offset, limit := pagination.GetLimits() + q = q.Offset(offset).Limit(limit) + } + if len(destPrefixes) != 0 { // A bit ugly but still more readable than scopes provided by gorm + qIds := bytes.NewBufferString("(") + for idx, destPrefix := range destPrefixes { + if idx != 0 { + qIds.WriteString(" OR") + } + qIds.WriteString(fmt.Sprintf(" %s.destination LIKE '%s%%'", utils.TBL_CDRS_PRIMARY, destPrefix)) + } + qIds.WriteString(" )") + q = q.Where(qIds.String()) + } + // Execute query + rows, err := q.Rows() + if err != nil { + return nil, err + } + for rows.Next() { + var cgrid, tor, accid, cdrhost, cdrsrc, reqtype, direction, tenant, category, account, subject, destination, runid, ratedAccount, ratedSubject sql.NullString + var extraFields []byte + var setupTime, answerTime mysql.NullTime + var orderid int64 + var usage, cost sql.NullFloat64 + var extraFieldsMp map[string]string + if err := rows.Scan(&cgrid, &orderid, &tor, &accid, &cdrhost, &cdrsrc, &reqtype, &direction, &tenant, &category, &account, &subject, &destination, &setupTime, &answerTime, &usage, + &extraFields, &runid, &ratedAccount, &ratedSubject, &cost); err != nil { + return nil, err + } + if len(extraFields) != 0 { + if err := json.Unmarshal(extraFields, &extraFieldsMp); err != nil { + return nil, fmt.Errorf("JSON unmarshal error for cgrid: %s, runid: %v, error: %s", cgrid.String, runid.String, err.Error()) + } + } + usageDur, _ := time.ParseDuration(strconv.FormatFloat(usage.Float64, 'f', -1, 64) + "s") + storCdr := &utils.StoredCdr{ + CgrId: cgrid.String, OrderId: orderid, TOR: tor.String, AccId: accid.String, CdrHost: cdrhost.String, CdrSource: cdrsrc.String, ReqType: reqtype.String, + Direction: direction.String, Tenant: tenant.String, + Category: category.String, Account: account.String, Subject: subject.String, Destination: destination.String, + SetupTime: setupTime.Time, AnswerTime: answerTime.Time, Usage: usageDur, + ExtraFields: extraFieldsMp, MediationRunId: runid.String, RatedAccount: ratedAccount.String, RatedSubject: ratedSubject.String, Cost: cost.Float64, + } + if !cost.Valid { //There was no cost provided, will fakely insert 0 if we do not handle it and reflect on re-rating + storCdr.Cost = -1 + } + cdrs = append(cdrs, storCdr) + } + return cdrs, nil } // Remove CDR data out of all CDR tables based on their cgrid