Add support for cdrs and sessions_costs for mysql

This commit is contained in:
TeoV
2018-05-07 10:28:20 -04:00
committed by Dan Christian Bogos
parent e22ff47fb3
commit e13a10ea6c
8 changed files with 344 additions and 81 deletions

View File

@@ -143,22 +143,19 @@ func (m *Migrator) migrateAccounts() (err error) {
}
current := engine.CurrentDataDBVersions()
switch vrs[utils.Accounts] {
case current[utils.Accounts]:
if m.sameDataDB {
return
}
if err := m.migrateCurrentAccounts(); err != nil {
return err
}
return
case 1:
if err := m.migrateV1Accounts(); err != nil {
return err
}
return m.migrateV1Accounts()
case 2:
if err := m.migrateV2Accounts(); err != nil {
return err
}
fallthrough
case current[utils.Accounts]:
if m.sameDataDB {
return
}
return m.migrateCurrentAccounts()
}
return
}

View File

@@ -45,97 +45,46 @@ var (
)
var sTestsAccIT = []func(t *testing.T){
testAccITConnect,
testAccITFlush,
testAccITMigrateAndMove,
}
func TestAccountITRedisConnection(t *testing.T) {
func TestAccountITRedis(t *testing.T) {
var err error
accPathIn = path.Join(*dataDir, "conf", "samples", "tutmysql")
accCfgIn, err = config.NewCGRConfigFromFolder(accPathIn)
if err != nil {
t.Fatal(err)
}
dataDBIn, err := engine.ConfigureDataStorage(accCfgIn.DataDbType,
accCfgIn.DataDbHost, accCfgIn.DataDbPort, accCfgIn.DataDbName,
accCfgIn.DataDbUser, accCfgIn.DataDbPass, accCfgIn.DBDataEncoding,
config.CgrConfig().CacheCfg(), *loadHistorySize)
accCfgOut, err = config.NewCGRConfigFromFolder(accPathIn)
if err != nil {
log.Fatal(err)
t.Fatal(err)
}
dataDBOut, err := engine.ConfigureDataStorage(accCfgIn.DataDbType,
accCfgIn.DataDbHost, accCfgIn.DataDbPort, accCfgIn.DataDbName,
accCfgIn.DataDbUser, accCfgIn.DataDbPass, accCfgIn.DBDataEncoding,
config.CgrConfig().CacheCfg(), *loadHistorySize)
if err != nil {
log.Fatal(err)
}
oldDataDB, err := ConfigureV1DataStorage(accCfgIn.DataDbType,
accCfgIn.DataDbHost, accCfgIn.DataDbPort, accCfgIn.DataDbName,
accCfgIn.DataDbUser, accCfgIn.DataDbPass, accCfgIn.DBDataEncoding)
if err != nil {
log.Fatal(err)
}
accMigrator, err = NewMigrator(dataDBIn, dataDBOut, accCfgIn.DataDbType,
accCfgIn.DBDataEncoding, nil, nil, accCfgIn.StorDBType, oldDataDB,
accCfgIn.DataDbType, accCfgIn.DBDataEncoding, nil, accCfgIn.StorDBType,
false, false, false, false, false)
if err != nil {
log.Fatal(err)
}
}
func TestAccountITRedis(t *testing.T) {
accAction = utils.Migrate
for _, stest := range sTestsAccIT {
t.Run("TestAccountITMigrateRedis", stest)
}
}
func TestAccountITMongoConnection(t *testing.T) {
func TestAccountITMongo(t *testing.T) {
var err error
accPathIn = path.Join(*dataDir, "conf", "samples", "tutmongo")
accCfgIn, err = config.NewCGRConfigFromFolder(accPathIn)
if err != nil {
t.Fatal(err)
}
dataDBIn, err := engine.ConfigureDataStorage(accCfgIn.DataDbType,
accCfgIn.DataDbHost, accCfgIn.DataDbPort, accCfgIn.DataDbName,
accCfgIn.DataDbUser, accCfgIn.DataDbPass, accCfgIn.DBDataEncoding,
config.CgrConfig().CacheCfg(), *loadHistorySize)
accCfgOut, err = config.NewCGRConfigFromFolder(accPathIn)
if err != nil {
log.Fatal(err)
t.Fatal(err)
}
dataDBOut, err := engine.ConfigureDataStorage(accCfgIn.DataDbType,
accCfgIn.DataDbHost, accCfgIn.DataDbPort, accCfgIn.DataDbName,
accCfgIn.DataDbUser, accCfgIn.DataDbPass, accCfgIn.DBDataEncoding,
config.CgrConfig().CacheCfg(), *loadHistorySize)
if err != nil {
log.Fatal(err)
}
oldDataDB, err := ConfigureV1DataStorage(accCfgIn.DataDbType,
accCfgIn.DataDbHost, accCfgIn.DataDbPort, accCfgIn.DataDbName,
accCfgIn.DataDbUser, accCfgIn.DataDbPass, accCfgIn.DBDataEncoding)
if err != nil {
log.Fatal(err)
}
accMigrator, err = NewMigrator(dataDBIn, dataDBOut, accCfgIn.DataDbType,
accCfgIn.DBDataEncoding, nil, nil, accCfgIn.StorDBType, oldDataDB,
accCfgIn.DataDbType, accCfgIn.DBDataEncoding, nil, accCfgIn.StorDBType,
false, false, false, false, false)
if err != nil {
log.Fatal(err)
}
}
func TestAccountITMongo(t *testing.T) {
accAction = utils.Migrate
for _, stest := range sTestsAccIT {
t.Run("TestAccountITMigrateMongo", stest)
}
}
func TestAccountITMoveConnection(t *testing.T) {
func TestAccountITMove(t *testing.T) {
var err error
accPathIn = path.Join(*dataDir, "conf", "samples", "tutmongo")
accCfgIn, err = config.NewCGRConfigFromFolder(accPathIn)
@@ -147,6 +96,13 @@ func TestAccountITMoveConnection(t *testing.T) {
if err != nil {
t.Fatal(err)
}
accAction = utils.Move
for _, stest := range sTestsAccIT {
t.Run("TestAccountITMove", stest)
}
}
func testAccITConnect(t *testing.T) {
dataDBIn, err := engine.ConfigureDataStorage(accCfgIn.DataDbType,
accCfgIn.DataDbHost, accCfgIn.DataDbPort, accCfgIn.DataDbName,
accCfgIn.DataDbUser, accCfgIn.DataDbPass, accCfgIn.DBDataEncoding,
@@ -176,13 +132,6 @@ func TestAccountITMoveConnection(t *testing.T) {
}
}
func TestAccountITMove(t *testing.T) {
accAction = utils.Move
for _, stest := range sTestsAccIT {
t.Run("TestAccountITMove", stest)
}
}
func testAccITFlush(t *testing.T) {
accMigrator.dmOut.DataDB().Flush("")
if err := engine.SetDBVersions(accMigrator.dmOut.DataDB()); err != nil {

View File

@@ -19,6 +19,7 @@ along with this program. If not, see <http://www.gnu.org/licenses/>
package migrator
import (
"encoding/json"
"fmt"
"time"
@@ -58,14 +59,14 @@ func (m *Migrator) migrateCDRs() (err error) {
"version number is not defined for Actions")
}
switch vrs[utils.CDRs] {
case current[utils.CDRs]:
if err := m.migrateCurrentCDRs(); err != nil {
return err
}
case 1:
if err := m.migrateV1CDRs(); err != nil {
return err
}
case current[utils.CDRs]:
if err := m.migrateCurrentCDRs(); err != nil {
return err
}
}
return
}
@@ -162,3 +163,63 @@ func (v1Cdr v1Cdrs) V1toV2Cdr() (cdr *engine.CDR) {
}
return
}
func NewV1CDRFromCDRSql(cdrSql *engine.CDRsql) (cdr *v1Cdrs, err error) {
cdr = new(v1Cdrs)
cdr.CGRID = cdrSql.Cgrid
cdr.RunID = cdrSql.RunID
cdr.OriginHost = cdrSql.OriginHost
cdr.Source = cdrSql.Source
cdr.OriginID = cdrSql.OriginID
cdr.OrderID = cdrSql.ID
cdr.ToR = cdrSql.TOR
cdr.RequestType = cdrSql.RequestType
cdr.Tenant = cdrSql.Tenant
cdr.Category = cdrSql.Category
cdr.Account = cdrSql.Account
cdr.Subject = cdrSql.Subject
cdr.Destination = cdrSql.Destination
cdr.SetupTime = cdrSql.SetupTime
cdr.AnswerTime = cdrSql.AnswerTime
cdr.Usage = time.Duration(cdrSql.Usage)
cdr.CostSource = cdrSql.CostSource
cdr.Cost = cdrSql.Cost
cdr.ExtraInfo = cdrSql.ExtraInfo
if cdrSql.ExtraFields != "" {
if err = json.Unmarshal([]byte(cdrSql.ExtraFields), &cdr.ExtraFields); err != nil {
return nil, err
}
}
if cdrSql.CostDetails != "" {
if err = json.Unmarshal([]byte(cdrSql.CostDetails), &cdr.CostDetails); err != nil {
return nil, err
}
}
return
}
func (cdr v1Cdrs) AsCDRsql() (cdrSql *engine.CDRsql) {
cdrSql = new(engine.CDRsql)
cdrSql.Cgrid = cdr.CGRID
cdrSql.RunID = cdr.RunID
cdrSql.OriginHost = cdr.OriginHost
cdrSql.Source = cdr.Source
cdrSql.OriginID = cdr.OriginID
cdrSql.TOR = cdr.ToR
cdrSql.RequestType = cdr.RequestType
cdrSql.Tenant = cdr.Tenant
cdrSql.Category = cdr.Category
cdrSql.Account = cdr.Account
cdrSql.Subject = cdr.Subject
cdrSql.Destination = cdr.Destination
cdrSql.SetupTime = cdr.SetupTime
cdrSql.AnswerTime = cdr.AnswerTime
cdrSql.Usage = cdr.Usage.Nanoseconds()
cdrSql.ExtraFields = utils.ToJSON(cdr.ExtraFields)
cdrSql.CostSource = cdr.CostSource
cdrSql.Cost = cdr.Cost
cdrSql.CostDetails = utils.ToJSON(cdr.CostDetails)
cdrSql.ExtraInfo = cdr.ExtraInfo
cdrSql.CreatedAt = time.Now()
return
}

View File

@@ -94,6 +94,55 @@ func TestCdrITMongo(t *testing.T) {
}
}
func TestCdrITMySqlConnection(t *testing.T) {
var err error
cdrPathIn = path.Join(*dataDir, "conf", "samples", "tutmysql")
cdrCfgIn, err = config.NewCGRConfigFromFolder(cdrPathIn)
if err != nil {
t.Error(err)
}
storDBIn, err := engine.ConfigureStorDB(cdrCfgIn.StorDBType, cdrCfgIn.StorDBHost,
cdrCfgIn.StorDBPort, cdrCfgIn.StorDBName,
cdrCfgIn.StorDBUser, cdrCfgIn.StorDBPass,
config.CgrConfig().StorDBMaxOpenConns,
config.CgrConfig().StorDBMaxIdleConns,
config.CgrConfig().StorDBConnMaxLifetime,
config.CgrConfig().StorDBCDRSIndexes)
if err != nil {
t.Error(err)
}
storDBOut, err := engine.ConfigureStorDB(cdrCfgIn.StorDBType,
cdrCfgIn.StorDBHost, cdrCfgIn.StorDBPort, cdrCfgIn.StorDBName,
cdrCfgIn.StorDBUser, cdrCfgIn.StorDBPass,
config.CgrConfig().StorDBMaxOpenConns,
config.CgrConfig().StorDBMaxIdleConns,
config.CgrConfig().StorDBConnMaxLifetime,
config.CgrConfig().StorDBCDRSIndexes)
if err != nil {
t.Error(err)
}
oldStorDB, err := ConfigureV1StorDB(cdrCfgIn.StorDBType,
cdrCfgIn.StorDBHost, cdrCfgIn.StorDBPort, cdrCfgIn.StorDBName,
cdrCfgIn.StorDBUser, cdrCfgIn.StorDBPass)
if err != nil {
log.Fatal(err)
}
cdrMigrator, err = NewMigrator(nil, nil, cdrCfgIn.DataDbType,
cdrCfgIn.DBDataEncoding, storDBIn, storDBOut, cdrCfgIn.StorDBType, nil,
cdrCfgIn.DataDbType, cdrCfgIn.DBDataEncoding, oldStorDB, cdrCfgIn.StorDBType,
false, false, false, false, false)
if err != nil {
t.Error(err)
}
}
func TestCdrITMySql(t *testing.T) {
for _, stest := range sTestsCdrIT {
t.Run("TestCdrITMigrateMySql", stest)
}
}
func testCdrITFlush(t *testing.T) {
if err := cdrMigrator.storDBOut.Flush(
path.Join(cdrCfgIn.DataFolderPath, "storage", cdrCfgIn.StorDBType)); err != nil {

View File

@@ -20,6 +20,7 @@ package migrator
import (
"database/sql"
"encoding/json"
"fmt"
"time"
@@ -156,3 +157,31 @@ func (v2Cost v2SessionsCost) V2toV3Cost() (cost *engine.SMCost) {
}
return
}
func NewV2SessionsCostFromSessionsCostSql(smSql *engine.SessionsCostsSQL) (smV2 *v2SessionsCost, err error) {
smV2 = new(v2SessionsCost)
smV2.CGRID = smSql.Cgrid
smV2.RunID = smSql.RunID
smV2.OriginHost = smSql.OriginHost
smV2.OriginID = smSql.OriginID
smV2.CostSource = smSql.CostSource
smV2.Usage = time.Duration(smSql.Usage)
smV2.CostDetails = new(engine.CallCost)
if err := json.Unmarshal([]byte(smSql.CostDetails), smV2.CostDetails); err != nil {
return nil, err
}
return
}
func (v2Cost v2SessionsCost) AsSessionsCostSql() (smSql *engine.SessionsCostsSQL) {
smSql = new(engine.SessionsCostsSQL)
smSql.Cgrid = v2Cost.CGRID
smSql.RunID = v2Cost.RunID
smSql.OriginHost = v2Cost.OriginHost
smSql.OriginID = v2Cost.OriginID
smSql.CostSource = v2Cost.CostSource
smSql.CostDetails = utils.ToJSON(v2Cost.CostDetails)
smSql.Usage = v2Cost.Usage.Nanoseconds()
smSql.CreatedAt = time.Now()
return
}

View File

@@ -94,6 +94,55 @@ func TestSessionCostITMongo(t *testing.T) {
}
}
func TestSessionCostITMySqlConnection(t *testing.T) {
var err error
sCostPathIn = path.Join(*dataDir, "conf", "samples", "tutmysql")
sCostCfgIn, err = config.NewCGRConfigFromFolder(sCostPathIn)
if err != nil {
t.Error(err)
}
storDBIn, err := engine.ConfigureStorDB(sCostCfgIn.StorDBType, sCostCfgIn.StorDBHost,
sCostCfgIn.StorDBPort, sCostCfgIn.StorDBName,
sCostCfgIn.StorDBUser, sCostCfgIn.StorDBPass,
config.CgrConfig().StorDBMaxOpenConns,
config.CgrConfig().StorDBMaxIdleConns,
config.CgrConfig().StorDBConnMaxLifetime,
config.CgrConfig().StorDBCDRSIndexes)
if err != nil {
t.Error(err)
}
storDBOut, err := engine.ConfigureStorDB(sCostCfgIn.StorDBType,
sCostCfgIn.StorDBHost, sCostCfgIn.StorDBPort, sCostCfgIn.StorDBName,
sCostCfgIn.StorDBUser, sCostCfgIn.StorDBPass,
config.CgrConfig().StorDBMaxOpenConns,
config.CgrConfig().StorDBMaxIdleConns,
config.CgrConfig().StorDBConnMaxLifetime,
config.CgrConfig().StorDBCDRSIndexes)
if err != nil {
t.Error(err)
}
oldStorDB, err := ConfigureV1StorDB(sCostCfgIn.StorDBType,
sCostCfgIn.StorDBHost, sCostCfgIn.StorDBPort, sCostCfgIn.StorDBName,
sCostCfgIn.StorDBUser, sCostCfgIn.StorDBPass)
if err != nil {
log.Fatal(err)
}
sCostMigrator, err = NewMigrator(nil, nil, sCostCfgIn.DataDbType,
sCostCfgIn.DBDataEncoding, storDBIn, storDBOut, sCostCfgIn.StorDBType, nil,
sCostCfgIn.DataDbType, sCostCfgIn.DBDataEncoding, oldStorDB, sCostCfgIn.StorDBType,
false, false, false, false, false)
if err != nil {
t.Error(err)
}
}
func TestSessionCostITMySql(t *testing.T) {
for _, stest := range sTestssCostIT {
t.Run("TestSessionSCostITMigrateMySql", stest)
}
}
func testSessionCostITFlush(t *testing.T) {
if err := sCostMigrator.storDBOut.Flush(
path.Join(sCostCfgIn.DataFolderPath, "storage", sCostCfgIn.StorDBType)); err != nil {

View File

@@ -59,6 +59,9 @@ func ConfigureV1StorDB(db_type, host, port, name, user, pass string) (db Migrato
case utils.MONGO:
d, err = newv1MongoStorage(host, port, name, user, pass, utils.StorDB, nil)
db = d.(MigratorStorDB)
case utils.MYSQL:
d, err = newSqlStorage(host, port, name, user, pass)
db = d.(MigratorStorDB)
default:
err = errors.New(fmt.Sprintf("Unknown db '%s' valid options are '%s'",
db_type, utils.MONGO))

126
migrator/v1sql.go Executable file
View File

@@ -0,0 +1,126 @@
/*
Real-time Online/Offline Charging System (OCS) for Telecom & ISP environments
Copyright (C) ITsysCOM GmbH
This program is free software: you can redistribute it and/or modify
it under the terms of the GNU General Public License as published by
the Free Software Foundation, either version 3 of the License, or
(at your option) any later version.
This program is distributed in the hope that it will be useful,
but WITHOUT ANY WARRANTY; without even the implied warranty of
MERCHANTABILITY or FITNESS FOR A PARTICULAR PURPOSE. See the
GNU General Public License for more details.
You should have received a copy of the GNU General Public License
along with this program. If not, see <http://www.gnu.org/licenses/>
*/
package migrator
import (
"database/sql"
"fmt"
"time"
"github.com/cgrates/cgrates/engine"
"github.com/cgrates/cgrates/utils"
_ "github.com/go-sql-driver/mysql"
"github.com/jinzhu/gorm"
)
type sqlStorage struct {
Db *sql.DB
db *gorm.DB
rowIter *sql.Rows
}
func newSqlStorage(host, port, name, user, password string) (*sqlStorage, error) {
connectString := fmt.Sprintf("%s:%s@tcp(%s:%s)/%s?charset=utf8&loc=Local&parseTime=true&sql_mode='ALLOW_INVALID_DATES,NO_AUTO_CREATE_USER'", user, password, host, port, name)
db, err := gorm.Open("mysql", connectString)
if err != nil {
return nil, err
}
if err = db.DB().Ping(); err != nil {
return nil, err
}
return &sqlStorage{Db: db.DB(), db: db}, nil
}
func (sqlStorage *sqlStorage) getV1CDR() (v1Cdr *v1Cdrs, err error) {
if sqlStorage.rowIter == nil {
sqlStorage.rowIter, err = sqlStorage.Db.Query("SELECT * FROM cdrs")
if err != nil {
return nil, err
}
}
cdrSql := new(engine.CDRsql)
sqlStorage.rowIter.Scan(&cdrSql)
v1Cdr, err = NewV1CDRFromCDRSql(cdrSql)
if sqlStorage.rowIter.Next() {
v1Cdr = nil
sqlStorage.rowIter = nil
return nil, utils.ErrNoMoreData
}
return v1Cdr, nil
}
func (sqlStorage *sqlStorage) setV1CDR(v1Cdr *v1Cdrs) (err error) {
tx := sqlStorage.db.Begin()
cdrSql := v1Cdr.AsCDRsql()
cdrSql.CreatedAt = time.Now()
saved := tx.Save(cdrSql)
if saved.Error != nil {
return saved.Error
}
tx.Commit()
return nil
}
func (sqlStorage *sqlStorage) getSMCost() (v2Cost *v2SessionsCost, err error) {
if sqlStorage.rowIter == nil {
sqlStorage.rowIter, err = sqlStorage.Db.Query("SELECT * FROM sessions_costs")
if err != nil {
return nil, err
}
}
scSql := new(engine.SessionsCostsSQL)
sqlStorage.rowIter.Scan(&scSql)
v2Cost, err = NewV2SessionsCostFromSessionsCostSql(scSql)
if sqlStorage.rowIter.Next() {
v2Cost = nil
sqlStorage.rowIter = nil
return nil, utils.ErrNoMoreData
}
return v2Cost, nil
}
func (sqlStorage *sqlStorage) setSMCost(v2Cost *v2SessionsCost) (err error) {
tx := sqlStorage.db.Begin()
smSql := v2Cost.AsSessionsCostSql()
smSql.CreatedAt = time.Now()
saved := tx.Save(smSql)
if saved.Error != nil {
return saved.Error
}
tx.Commit()
return
}
func (sqlStorage *sqlStorage) remSMCost(v2Cost *v2SessionsCost) (err error) {
tx := sqlStorage.db.Begin()
var rmParam *engine.SessionsCostsSQL
if v2Cost != nil {
rmParam = &engine.SessionsCostsSQL{Cgrid: v2Cost.CGRID,
RunID: v2Cost.RunID}
}
if err := tx.Where(rmParam).Delete(engine.SessionsCostsSQL{}).Error; err != nil {
tx.Rollback()
return err
}
tx.Commit()
return nil
}