diff --git a/migrator/accounts.go b/migrator/accounts.go index f5807691f..9db3c20cc 100755 --- a/migrator/accounts.go +++ b/migrator/accounts.go @@ -143,22 +143,19 @@ func (m *Migrator) migrateAccounts() (err error) { } current := engine.CurrentDataDBVersions() switch vrs[utils.Accounts] { - case current[utils.Accounts]: - if m.sameDataDB { - return - } - if err := m.migrateCurrentAccounts(); err != nil { - return err - } - return + case 1: - if err := m.migrateV1Accounts(); err != nil { - return err - } + return m.migrateV1Accounts() case 2: if err := m.migrateV2Accounts(); err != nil { return err } + fallthrough + case current[utils.Accounts]: + if m.sameDataDB { + return + } + return m.migrateCurrentAccounts() } return } diff --git a/migrator/accounts_it_test.go b/migrator/accounts_it_test.go index a284e791c..f266b4238 100755 --- a/migrator/accounts_it_test.go +++ b/migrator/accounts_it_test.go @@ -45,97 +45,46 @@ var ( ) var sTestsAccIT = []func(t *testing.T){ + testAccITConnect, testAccITFlush, testAccITMigrateAndMove, } -func TestAccountITRedisConnection(t *testing.T) { +func TestAccountITRedis(t *testing.T) { var err error accPathIn = path.Join(*dataDir, "conf", "samples", "tutmysql") accCfgIn, err = config.NewCGRConfigFromFolder(accPathIn) if err != nil { t.Fatal(err) } - dataDBIn, err := engine.ConfigureDataStorage(accCfgIn.DataDbType, - accCfgIn.DataDbHost, accCfgIn.DataDbPort, accCfgIn.DataDbName, - accCfgIn.DataDbUser, accCfgIn.DataDbPass, accCfgIn.DBDataEncoding, - config.CgrConfig().CacheCfg(), *loadHistorySize) + accCfgOut, err = config.NewCGRConfigFromFolder(accPathIn) if err != nil { - log.Fatal(err) + t.Fatal(err) } - dataDBOut, err := engine.ConfigureDataStorage(accCfgIn.DataDbType, - accCfgIn.DataDbHost, accCfgIn.DataDbPort, accCfgIn.DataDbName, - accCfgIn.DataDbUser, accCfgIn.DataDbPass, accCfgIn.DBDataEncoding, - config.CgrConfig().CacheCfg(), *loadHistorySize) - if err != nil { - log.Fatal(err) - } - oldDataDB, err := ConfigureV1DataStorage(accCfgIn.DataDbType, - accCfgIn.DataDbHost, accCfgIn.DataDbPort, accCfgIn.DataDbName, - accCfgIn.DataDbUser, accCfgIn.DataDbPass, accCfgIn.DBDataEncoding) - if err != nil { - log.Fatal(err) - } - accMigrator, err = NewMigrator(dataDBIn, dataDBOut, accCfgIn.DataDbType, - accCfgIn.DBDataEncoding, nil, nil, accCfgIn.StorDBType, oldDataDB, - accCfgIn.DataDbType, accCfgIn.DBDataEncoding, nil, accCfgIn.StorDBType, - false, false, false, false, false) - if err != nil { - log.Fatal(err) - } -} - -func TestAccountITRedis(t *testing.T) { accAction = utils.Migrate for _, stest := range sTestsAccIT { t.Run("TestAccountITMigrateRedis", stest) } } -func TestAccountITMongoConnection(t *testing.T) { +func TestAccountITMongo(t *testing.T) { var err error accPathIn = path.Join(*dataDir, "conf", "samples", "tutmongo") accCfgIn, err = config.NewCGRConfigFromFolder(accPathIn) if err != nil { t.Fatal(err) } - dataDBIn, err := engine.ConfigureDataStorage(accCfgIn.DataDbType, - accCfgIn.DataDbHost, accCfgIn.DataDbPort, accCfgIn.DataDbName, - accCfgIn.DataDbUser, accCfgIn.DataDbPass, accCfgIn.DBDataEncoding, - config.CgrConfig().CacheCfg(), *loadHistorySize) + accCfgOut, err = config.NewCGRConfigFromFolder(accPathIn) if err != nil { - log.Fatal(err) + t.Fatal(err) } - dataDBOut, err := engine.ConfigureDataStorage(accCfgIn.DataDbType, - accCfgIn.DataDbHost, accCfgIn.DataDbPort, accCfgIn.DataDbName, - accCfgIn.DataDbUser, accCfgIn.DataDbPass, accCfgIn.DBDataEncoding, - config.CgrConfig().CacheCfg(), *loadHistorySize) - if err != nil { - log.Fatal(err) - } - oldDataDB, err := ConfigureV1DataStorage(accCfgIn.DataDbType, - accCfgIn.DataDbHost, accCfgIn.DataDbPort, accCfgIn.DataDbName, - accCfgIn.DataDbUser, accCfgIn.DataDbPass, accCfgIn.DBDataEncoding) - if err != nil { - log.Fatal(err) - } - accMigrator, err = NewMigrator(dataDBIn, dataDBOut, accCfgIn.DataDbType, - accCfgIn.DBDataEncoding, nil, nil, accCfgIn.StorDBType, oldDataDB, - accCfgIn.DataDbType, accCfgIn.DBDataEncoding, nil, accCfgIn.StorDBType, - false, false, false, false, false) - if err != nil { - log.Fatal(err) - } -} - -func TestAccountITMongo(t *testing.T) { accAction = utils.Migrate for _, stest := range sTestsAccIT { t.Run("TestAccountITMigrateMongo", stest) } } -func TestAccountITMoveConnection(t *testing.T) { +func TestAccountITMove(t *testing.T) { var err error accPathIn = path.Join(*dataDir, "conf", "samples", "tutmongo") accCfgIn, err = config.NewCGRConfigFromFolder(accPathIn) @@ -147,6 +96,13 @@ func TestAccountITMoveConnection(t *testing.T) { if err != nil { t.Fatal(err) } + accAction = utils.Move + for _, stest := range sTestsAccIT { + t.Run("TestAccountITMove", stest) + } +} + +func testAccITConnect(t *testing.T) { dataDBIn, err := engine.ConfigureDataStorage(accCfgIn.DataDbType, accCfgIn.DataDbHost, accCfgIn.DataDbPort, accCfgIn.DataDbName, accCfgIn.DataDbUser, accCfgIn.DataDbPass, accCfgIn.DBDataEncoding, @@ -176,13 +132,6 @@ func TestAccountITMoveConnection(t *testing.T) { } } -func TestAccountITMove(t *testing.T) { - accAction = utils.Move - for _, stest := range sTestsAccIT { - t.Run("TestAccountITMove", stest) - } -} - func testAccITFlush(t *testing.T) { accMigrator.dmOut.DataDB().Flush("") if err := engine.SetDBVersions(accMigrator.dmOut.DataDB()); err != nil { diff --git a/migrator/cdrs.go b/migrator/cdrs.go index d762d614c..86e0b68af 100755 --- a/migrator/cdrs.go +++ b/migrator/cdrs.go @@ -19,6 +19,7 @@ along with this program. If not, see package migrator import ( + "encoding/json" "fmt" "time" @@ -58,14 +59,14 @@ func (m *Migrator) migrateCDRs() (err error) { "version number is not defined for Actions") } switch vrs[utils.CDRs] { - case current[utils.CDRs]: - if err := m.migrateCurrentCDRs(); err != nil { - return err - } case 1: if err := m.migrateV1CDRs(); err != nil { return err } + case current[utils.CDRs]: + if err := m.migrateCurrentCDRs(); err != nil { + return err + } } return } @@ -162,3 +163,63 @@ func (v1Cdr v1Cdrs) V1toV2Cdr() (cdr *engine.CDR) { } return } + +func NewV1CDRFromCDRSql(cdrSql *engine.CDRsql) (cdr *v1Cdrs, err error) { + cdr = new(v1Cdrs) + cdr.CGRID = cdrSql.Cgrid + cdr.RunID = cdrSql.RunID + cdr.OriginHost = cdrSql.OriginHost + cdr.Source = cdrSql.Source + cdr.OriginID = cdrSql.OriginID + cdr.OrderID = cdrSql.ID + cdr.ToR = cdrSql.TOR + cdr.RequestType = cdrSql.RequestType + cdr.Tenant = cdrSql.Tenant + cdr.Category = cdrSql.Category + cdr.Account = cdrSql.Account + cdr.Subject = cdrSql.Subject + cdr.Destination = cdrSql.Destination + cdr.SetupTime = cdrSql.SetupTime + cdr.AnswerTime = cdrSql.AnswerTime + cdr.Usage = time.Duration(cdrSql.Usage) + cdr.CostSource = cdrSql.CostSource + cdr.Cost = cdrSql.Cost + cdr.ExtraInfo = cdrSql.ExtraInfo + if cdrSql.ExtraFields != "" { + if err = json.Unmarshal([]byte(cdrSql.ExtraFields), &cdr.ExtraFields); err != nil { + return nil, err + } + } + if cdrSql.CostDetails != "" { + if err = json.Unmarshal([]byte(cdrSql.CostDetails), &cdr.CostDetails); err != nil { + return nil, err + } + } + return +} + +func (cdr v1Cdrs) AsCDRsql() (cdrSql *engine.CDRsql) { + cdrSql = new(engine.CDRsql) + cdrSql.Cgrid = cdr.CGRID + cdrSql.RunID = cdr.RunID + cdrSql.OriginHost = cdr.OriginHost + cdrSql.Source = cdr.Source + cdrSql.OriginID = cdr.OriginID + cdrSql.TOR = cdr.ToR + cdrSql.RequestType = cdr.RequestType + cdrSql.Tenant = cdr.Tenant + cdrSql.Category = cdr.Category + cdrSql.Account = cdr.Account + cdrSql.Subject = cdr.Subject + cdrSql.Destination = cdr.Destination + cdrSql.SetupTime = cdr.SetupTime + cdrSql.AnswerTime = cdr.AnswerTime + cdrSql.Usage = cdr.Usage.Nanoseconds() + cdrSql.ExtraFields = utils.ToJSON(cdr.ExtraFields) + cdrSql.CostSource = cdr.CostSource + cdrSql.Cost = cdr.Cost + cdrSql.CostDetails = utils.ToJSON(cdr.CostDetails) + cdrSql.ExtraInfo = cdr.ExtraInfo + cdrSql.CreatedAt = time.Now() + return +} diff --git a/migrator/cdrs_it_test.go b/migrator/cdrs_it_test.go index fb53149ce..d98382c1e 100755 --- a/migrator/cdrs_it_test.go +++ b/migrator/cdrs_it_test.go @@ -94,6 +94,55 @@ func TestCdrITMongo(t *testing.T) { } } +func TestCdrITMySqlConnection(t *testing.T) { + var err error + cdrPathIn = path.Join(*dataDir, "conf", "samples", "tutmysql") + cdrCfgIn, err = config.NewCGRConfigFromFolder(cdrPathIn) + if err != nil { + t.Error(err) + } + storDBIn, err := engine.ConfigureStorDB(cdrCfgIn.StorDBType, cdrCfgIn.StorDBHost, + cdrCfgIn.StorDBPort, cdrCfgIn.StorDBName, + cdrCfgIn.StorDBUser, cdrCfgIn.StorDBPass, + config.CgrConfig().StorDBMaxOpenConns, + config.CgrConfig().StorDBMaxIdleConns, + config.CgrConfig().StorDBConnMaxLifetime, + config.CgrConfig().StorDBCDRSIndexes) + if err != nil { + t.Error(err) + } + storDBOut, err := engine.ConfigureStorDB(cdrCfgIn.StorDBType, + cdrCfgIn.StorDBHost, cdrCfgIn.StorDBPort, cdrCfgIn.StorDBName, + cdrCfgIn.StorDBUser, cdrCfgIn.StorDBPass, + config.CgrConfig().StorDBMaxOpenConns, + config.CgrConfig().StorDBMaxIdleConns, + config.CgrConfig().StorDBConnMaxLifetime, + config.CgrConfig().StorDBCDRSIndexes) + if err != nil { + t.Error(err) + } + oldStorDB, err := ConfigureV1StorDB(cdrCfgIn.StorDBType, + cdrCfgIn.StorDBHost, cdrCfgIn.StorDBPort, cdrCfgIn.StorDBName, + cdrCfgIn.StorDBUser, cdrCfgIn.StorDBPass) + if err != nil { + log.Fatal(err) + } + + cdrMigrator, err = NewMigrator(nil, nil, cdrCfgIn.DataDbType, + cdrCfgIn.DBDataEncoding, storDBIn, storDBOut, cdrCfgIn.StorDBType, nil, + cdrCfgIn.DataDbType, cdrCfgIn.DBDataEncoding, oldStorDB, cdrCfgIn.StorDBType, + false, false, false, false, false) + if err != nil { + t.Error(err) + } +} + +func TestCdrITMySql(t *testing.T) { + for _, stest := range sTestsCdrIT { + t.Run("TestCdrITMigrateMySql", stest) + } +} + func testCdrITFlush(t *testing.T) { if err := cdrMigrator.storDBOut.Flush( path.Join(cdrCfgIn.DataFolderPath, "storage", cdrCfgIn.StorDBType)); err != nil { diff --git a/migrator/sessions_costs.go b/migrator/sessions_costs.go index 9bc13d841..0924c1463 100644 --- a/migrator/sessions_costs.go +++ b/migrator/sessions_costs.go @@ -20,6 +20,7 @@ package migrator import ( "database/sql" + "encoding/json" "fmt" "time" @@ -156,3 +157,31 @@ func (v2Cost v2SessionsCost) V2toV3Cost() (cost *engine.SMCost) { } return } + +func NewV2SessionsCostFromSessionsCostSql(smSql *engine.SessionsCostsSQL) (smV2 *v2SessionsCost, err error) { + smV2 = new(v2SessionsCost) + smV2.CGRID = smSql.Cgrid + smV2.RunID = smSql.RunID + smV2.OriginHost = smSql.OriginHost + smV2.OriginID = smSql.OriginID + smV2.CostSource = smSql.CostSource + smV2.Usage = time.Duration(smSql.Usage) + smV2.CostDetails = new(engine.CallCost) + if err := json.Unmarshal([]byte(smSql.CostDetails), smV2.CostDetails); err != nil { + return nil, err + } + return +} + +func (v2Cost v2SessionsCost) AsSessionsCostSql() (smSql *engine.SessionsCostsSQL) { + smSql = new(engine.SessionsCostsSQL) + smSql.Cgrid = v2Cost.CGRID + smSql.RunID = v2Cost.RunID + smSql.OriginHost = v2Cost.OriginHost + smSql.OriginID = v2Cost.OriginID + smSql.CostSource = v2Cost.CostSource + smSql.CostDetails = utils.ToJSON(v2Cost.CostDetails) + smSql.Usage = v2Cost.Usage.Nanoseconds() + smSql.CreatedAt = time.Now() + return +} diff --git a/migrator/sessions_costs_it_test.go b/migrator/sessions_costs_it_test.go index ca7fc2368..dcbda2c6b 100755 --- a/migrator/sessions_costs_it_test.go +++ b/migrator/sessions_costs_it_test.go @@ -94,6 +94,55 @@ func TestSessionCostITMongo(t *testing.T) { } } +func TestSessionCostITMySqlConnection(t *testing.T) { + var err error + sCostPathIn = path.Join(*dataDir, "conf", "samples", "tutmysql") + sCostCfgIn, err = config.NewCGRConfigFromFolder(sCostPathIn) + if err != nil { + t.Error(err) + } + storDBIn, err := engine.ConfigureStorDB(sCostCfgIn.StorDBType, sCostCfgIn.StorDBHost, + sCostCfgIn.StorDBPort, sCostCfgIn.StorDBName, + sCostCfgIn.StorDBUser, sCostCfgIn.StorDBPass, + config.CgrConfig().StorDBMaxOpenConns, + config.CgrConfig().StorDBMaxIdleConns, + config.CgrConfig().StorDBConnMaxLifetime, + config.CgrConfig().StorDBCDRSIndexes) + if err != nil { + t.Error(err) + } + storDBOut, err := engine.ConfigureStorDB(sCostCfgIn.StorDBType, + sCostCfgIn.StorDBHost, sCostCfgIn.StorDBPort, sCostCfgIn.StorDBName, + sCostCfgIn.StorDBUser, sCostCfgIn.StorDBPass, + config.CgrConfig().StorDBMaxOpenConns, + config.CgrConfig().StorDBMaxIdleConns, + config.CgrConfig().StorDBConnMaxLifetime, + config.CgrConfig().StorDBCDRSIndexes) + if err != nil { + t.Error(err) + } + oldStorDB, err := ConfigureV1StorDB(sCostCfgIn.StorDBType, + sCostCfgIn.StorDBHost, sCostCfgIn.StorDBPort, sCostCfgIn.StorDBName, + sCostCfgIn.StorDBUser, sCostCfgIn.StorDBPass) + if err != nil { + log.Fatal(err) + } + + sCostMigrator, err = NewMigrator(nil, nil, sCostCfgIn.DataDbType, + sCostCfgIn.DBDataEncoding, storDBIn, storDBOut, sCostCfgIn.StorDBType, nil, + sCostCfgIn.DataDbType, sCostCfgIn.DBDataEncoding, oldStorDB, sCostCfgIn.StorDBType, + false, false, false, false, false) + if err != nil { + t.Error(err) + } +} + +func TestSessionCostITMySql(t *testing.T) { + for _, stest := range sTestssCostIT { + t.Run("TestSessionSCostITMigrateMySql", stest) + } +} + func testSessionCostITFlush(t *testing.T) { if err := sCostMigrator.storDBOut.Flush( path.Join(sCostCfgIn.DataFolderPath, "storage", sCostCfgIn.StorDBType)); err != nil { diff --git a/migrator/v1migrator_utils.go b/migrator/v1migrator_utils.go index aad80d858..335a8dfdf 100644 --- a/migrator/v1migrator_utils.go +++ b/migrator/v1migrator_utils.go @@ -59,6 +59,9 @@ func ConfigureV1StorDB(db_type, host, port, name, user, pass string) (db Migrato case utils.MONGO: d, err = newv1MongoStorage(host, port, name, user, pass, utils.StorDB, nil) db = d.(MigratorStorDB) + case utils.MYSQL: + d, err = newSqlStorage(host, port, name, user, pass) + db = d.(MigratorStorDB) default: err = errors.New(fmt.Sprintf("Unknown db '%s' valid options are '%s'", db_type, utils.MONGO)) diff --git a/migrator/v1sql.go b/migrator/v1sql.go new file mode 100755 index 000000000..cbfb16ea5 --- /dev/null +++ b/migrator/v1sql.go @@ -0,0 +1,126 @@ +/* +Real-time Online/Offline Charging System (OCS) for Telecom & ISP environments +Copyright (C) ITsysCOM GmbH + +This program is free software: you can redistribute it and/or modify +it under the terms of the GNU General Public License as published by +the Free Software Foundation, either version 3 of the License, or +(at your option) any later version. + +This program is distributed in the hope that it will be useful, +but WITHOUT ANY WARRANTY; without even the implied warranty of +MERCHANTABILITY or FITNESS FOR A PARTICULAR PURPOSE. See the +GNU General Public License for more details. + +You should have received a copy of the GNU General Public License +along with this program. If not, see +*/ + +package migrator + +import ( + "database/sql" + "fmt" + "time" + + "github.com/cgrates/cgrates/engine" + "github.com/cgrates/cgrates/utils" + _ "github.com/go-sql-driver/mysql" + "github.com/jinzhu/gorm" +) + +type sqlStorage struct { + Db *sql.DB + db *gorm.DB + rowIter *sql.Rows +} + +func newSqlStorage(host, port, name, user, password string) (*sqlStorage, error) { + connectString := fmt.Sprintf("%s:%s@tcp(%s:%s)/%s?charset=utf8&loc=Local&parseTime=true&sql_mode='ALLOW_INVALID_DATES,NO_AUTO_CREATE_USER'", user, password, host, port, name) + db, err := gorm.Open("mysql", connectString) + if err != nil { + return nil, err + } + if err = db.DB().Ping(); err != nil { + return nil, err + } + return &sqlStorage{Db: db.DB(), db: db}, nil +} + +func (sqlStorage *sqlStorage) getV1CDR() (v1Cdr *v1Cdrs, err error) { + if sqlStorage.rowIter == nil { + sqlStorage.rowIter, err = sqlStorage.Db.Query("SELECT * FROM cdrs") + if err != nil { + return nil, err + } + } + cdrSql := new(engine.CDRsql) + sqlStorage.rowIter.Scan(&cdrSql) + v1Cdr, err = NewV1CDRFromCDRSql(cdrSql) + + if sqlStorage.rowIter.Next() { + v1Cdr = nil + sqlStorage.rowIter = nil + return nil, utils.ErrNoMoreData + } + return v1Cdr, nil +} + +func (sqlStorage *sqlStorage) setV1CDR(v1Cdr *v1Cdrs) (err error) { + tx := sqlStorage.db.Begin() + cdrSql := v1Cdr.AsCDRsql() + cdrSql.CreatedAt = time.Now() + saved := tx.Save(cdrSql) + if saved.Error != nil { + return saved.Error + } + tx.Commit() + return nil +} + +func (sqlStorage *sqlStorage) getSMCost() (v2Cost *v2SessionsCost, err error) { + if sqlStorage.rowIter == nil { + sqlStorage.rowIter, err = sqlStorage.Db.Query("SELECT * FROM sessions_costs") + if err != nil { + return nil, err + } + } + scSql := new(engine.SessionsCostsSQL) + sqlStorage.rowIter.Scan(&scSql) + v2Cost, err = NewV2SessionsCostFromSessionsCostSql(scSql) + + if sqlStorage.rowIter.Next() { + v2Cost = nil + sqlStorage.rowIter = nil + return nil, utils.ErrNoMoreData + } + return v2Cost, nil +} + +func (sqlStorage *sqlStorage) setSMCost(v2Cost *v2SessionsCost) (err error) { + tx := sqlStorage.db.Begin() + smSql := v2Cost.AsSessionsCostSql() + smSql.CreatedAt = time.Now() + saved := tx.Save(smSql) + if saved.Error != nil { + return saved.Error + } + tx.Commit() + return +} + +func (sqlStorage *sqlStorage) remSMCost(v2Cost *v2SessionsCost) (err error) { + tx := sqlStorage.db.Begin() + var rmParam *engine.SessionsCostsSQL + if v2Cost != nil { + rmParam = &engine.SessionsCostsSQL{Cgrid: v2Cost.CGRID, + RunID: v2Cost.RunID} + } + if err := tx.Where(rmParam).Delete(engine.SessionsCostsSQL{}).Error; err != nil { + tx.Rollback() + return err + } + tx.Commit() + return nil + +}