Finish new infrastructure for migrator

This commit is contained in:
TeoV
2018-05-09 11:05:35 -04:00
committed by Dan Christian Bogos
parent 3bc1cd511a
commit 0708890f78
47 changed files with 347 additions and 325 deletions

View File

@@ -25,7 +25,6 @@ import (
"strings"
"github.com/cgrates/cgrates/config"
"github.com/cgrates/cgrates/engine"
"github.com/cgrates/cgrates/migrator"
"github.com/cgrates/cgrates/utils"
)
@@ -34,15 +33,11 @@ const ()
var (
sameDataDB, sameStorDB bool
dmIN, dmOUT *engine.DataManager
outDataDB migrator.MigratorDataDB
storDBIn, storDBOut engine.StorDB
dmIN, dmOUT migrator.MigratorDataDB
storDBIn, storDBOut migrator.MigratorStorDB
err error
oDBDataEncoding string
dfltCfg = config.CgrConfig()
cfgDir = flag.String("config_dir", "",
dfltCfg = config.CgrConfig()
cfgDir = flag.String("config_dir", "",
"Configuration directory path.")
migrate = flag.String("migrate", "", "fire up automatic migration "+
@@ -106,10 +101,6 @@ var (
outStorDBPass = flag.String("out_stordb_passwd", utils.MetaStorDB,
"output StorDB password")
datadb_versions = flag.Bool("datadb_versions", false,
"print DataDB versions")
stordb_versions = flag.Bool("stordb_versions", false,
"print StorDB versions")
dryRun = flag.Bool("dry_run", false,
"parse loaded data for consistency and errors, without storing it")
verbose = flag.Bool("verbose", false, "enable detailed verbose logging output")
@@ -227,7 +218,7 @@ func main() {
*outStorDBName == mgrCfg.StorDBName &&
*outDBDataEncoding == mgrCfg.DBDataEncoding
if dmIN, err = engine.ConfigureDataStorage(mgrCfg.DataDbType,
if dmIN, err = migrator.NewMigratorDataDB(mgrCfg.DataDbType,
mgrCfg.DataDbHost, mgrCfg.DataDbPort,
mgrCfg.DataDbName, mgrCfg.DataDbUser,
mgrCfg.DataDbPass, mgrCfg.DBDataEncoding,
@@ -237,7 +228,7 @@ func main() {
if sameDataDB {
dmOUT = dmIN
} else if dmOUT, err = engine.ConfigureDataStorage(*outDataDBType,
} else if dmOUT, err = migrator.NewMigratorDataDB(*outDataDBType,
*outDataDBHost, *outDataDBPort,
*outDataDBName, *outDataDBUser,
*outDataDBPass, *outDBDataEncoding,
@@ -245,7 +236,7 @@ func main() {
log.Fatal(err)
}
if storDBIn, err = engine.ConfigureStorDB(*inStorDBType,
if storDBIn, err = migrator.NewMigratorStorDB(*inStorDBType,
*inStorDBHost, *inStorDBPort,
*inStorDBName, *inStorDBUser, *inStorDBPass,
config.CgrConfig().StorDBMaxOpenConns,
@@ -257,7 +248,7 @@ func main() {
if sameStorDB {
storDBOut = storDBIn
} else if storDBOut, err = engine.ConfigureStorDB(*outStorDBType,
} else if storDBOut, err = migrator.NewMigratorStorDB(*outStorDBType,
*outStorDBHost, *outStorDBPort,
*outStorDBName, *outStorDBUser,
*outStorDBPass,
@@ -268,47 +259,12 @@ func main() {
log.Fatal(err)
}
var outDataDB migrator.MigratorDataDB
if outDataDB, err = migrator.ConfigureV1DataStorage(*inDataDBType,
*inDataDBHost, *inDataDBPort,
*inDataDBName, *inDataDBUser,
*inDataDBPass, *inDBDataEncoding); err != nil {
log.Fatal(err)
}
var outStorDB migrator.MigratorStorDB
/* FixMe with interfaces
if outStorDB, err = migrator.ConfigureV1StorDB(*inStorDBType,
*inStorDBHost, *inStorDBPort,
*inStorDBName, *inStorDBUser,
*inStorDBPass); err != nil {
log.Fatal(err)
}
*/
m, err := migrator.NewMigrator(dmIN, dmOUT, mgrCfg.DataDbType, mgrCfg.DBDataEncoding,
storDBIn, storDBOut, mgrCfg.StorDBType, outDataDB,
*outDataDBType, mgrCfg.DBDataEncoding, outStorDB,
*outStorDBType, *dryRun, sameDataDB, sameStorDB, *datadb_versions, *stordb_versions)
m, err := migrator.NewMigrator(dmIN, dmOUT,
storDBIn, storDBOut,
*dryRun, sameDataDB, sameStorDB)
if err != nil {
log.Fatal(err)
}
if *datadb_versions {
vrs, _ := dmOUT.DataDB().GetVersions("")
if len(vrs) != 0 {
log.Printf("DataDB versions : %+v\n", vrs)
} else {
log.Printf("DataDB versions not_found")
}
}
if *stordb_versions {
vrs, _ := storDBOut.GetVersions("")
if len(vrs) != 0 {
log.Printf("StorDB versions : %+v\n", vrs)
} else {
log.Printf("StorDB versions not_found")
}
}
if migrate != nil && *migrate != "" { // Run migrator
migrstats := make(map[string]int)
mig := strings.Split(*migrate, ",")

View File

@@ -51,6 +51,10 @@ func (self *SQLStorage) Close() {
self.db.Close()
}
func (self *SQLStorage) ExportGormDB() *gorm.DB {
return self.db
}
func (self *SQLStorage) Flush(scriptsPath string) (err error) {
for _, scriptName := range []string{utils.CREATE_CDRS_TABLES_SQL, utils.CREATE_TARIFFPLAN_TABLES_SQL} {
if err := self.CreateTablesFromScript(path.Join(scriptsPath, scriptName)); err != nil {

View File

@@ -103,38 +103,31 @@ func TestAccountITMove(t *testing.T) {
}
func testAccITConnect(t *testing.T) {
dataDBIn, err := engine.ConfigureDataStorage(accCfgIn.DataDbType,
dataDBIn, err := NewMigratorDataDB(accCfgIn.DataDbType,
accCfgIn.DataDbHost, accCfgIn.DataDbPort, accCfgIn.DataDbName,
accCfgIn.DataDbUser, accCfgIn.DataDbPass, accCfgIn.DBDataEncoding,
config.CgrConfig().CacheCfg(), *loadHistorySize)
if err != nil {
log.Fatal(err)
}
dataDBOut, err := engine.ConfigureDataStorage(accCfgOut.DataDbType,
dataDBOut, err := NewMigratorDataDB(accCfgOut.DataDbType,
accCfgOut.DataDbHost, accCfgOut.DataDbPort, accCfgOut.DataDbName,
accCfgOut.DataDbUser, accCfgOut.DataDbPass, accCfgOut.DBDataEncoding,
config.CgrConfig().CacheCfg(), *loadHistorySize)
if err != nil {
log.Fatal(err)
}
oldDataDB, err := ConfigureV1DataStorage(accCfgIn.DataDbType,
accCfgIn.DataDbHost, accCfgIn.DataDbPort, accCfgIn.DataDbName,
accCfgIn.DataDbUser, accCfgIn.DataDbPass, accCfgIn.DBDataEncoding)
if err != nil {
log.Fatal(err)
}
accMigrator, err = NewMigrator(dataDBIn, dataDBOut, accCfgIn.DataDbType,
accCfgIn.DBDataEncoding, nil, nil, accCfgIn.StorDBType, oldDataDB,
accCfgIn.DataDbType, accCfgIn.DBDataEncoding, nil, accCfgIn.StorDBType,
false, false, false, false, false)
accMigrator, err = NewMigrator(dataDBIn, dataDBOut,
nil, nil,
false, false, false)
if err != nil {
log.Fatal(err)
}
}
func testAccITFlush(t *testing.T) {
accMigrator.dmOut.DataDB().Flush("")
if err := engine.SetDBVersions(accMigrator.dmOut.DataDB()); err != nil {
accMigrator.dmOut.DataManager().DataDB().Flush("")
if err := engine.SetDBVersions(accMigrator.dmOut.DataManager().DataDB()); err != nil {
t.Error("Error ", err.Error())
}
}
@@ -214,7 +207,7 @@ func testAccITMigrateAndMove(t *testing.T) {
ActionTriggers: engine.ActionTriggers{}}
switch accAction {
case utils.Migrate:
err := accMigrator.oldDataDB.setV1Account(v1Acc)
err := accMigrator.dmIN.setV1Account(v1Acc)
if err != nil {
t.Error("Error when setting v1 Accounts ", err.Error())
}
@@ -226,12 +219,12 @@ func testAccITMigrateAndMove(t *testing.T) {
utils.ActionTriggers: 2,
utils.ActionPlans: 2,
utils.SharedGroups: 2}
err = accMigrator.dmOut.DataDB().SetVersions(currentVersion, false)
err = accMigrator.dmOut.DataManager().DataDB().SetVersions(currentVersion, false)
if err != nil {
t.Error("Error when setting version for Accounts ", err.Error())
}
if vrs, err := accMigrator.dmOut.DataDB().GetVersions(""); err != nil {
if vrs, err := accMigrator.dmOut.DataManager().DataDB().GetVersions(""); err != nil {
t.Error(err)
} else if vrs[utils.Accounts] != 1 {
t.Errorf("Unexpected version returned: %d", vrs[utils.Accounts])
@@ -242,13 +235,13 @@ func testAccITMigrateAndMove(t *testing.T) {
t.Error("Error when migrating Accounts ", err.Error())
}
if vrs, err := accMigrator.dmOut.DataDB().GetVersions(""); err != nil {
if vrs, err := accMigrator.dmOut.DataManager().DataDB().GetVersions(""); err != nil {
t.Error(err)
} else if vrs[utils.Accounts] != 3 {
t.Errorf("Unexpected version returned: %d", vrs[utils.Accounts])
}
result, err := accMigrator.dmOut.DataDB().GetAccount(testAccount.ID)
result, err := accMigrator.dmOut.DataManager().DataDB().GetAccount(testAccount.ID)
if err != nil {
t.Error("Error when getting Accounts ", err.Error())
}
@@ -258,11 +251,11 @@ func testAccITMigrateAndMove(t *testing.T) {
t.Errorf("Expecting: %+v, received: %+v", testAccount, result)
}
case utils.Move:
if err := accMigrator.dmIN.DataDB().SetAccount(testAccount); err != nil {
if err := accMigrator.dmIN.DataManager().DataDB().SetAccount(testAccount); err != nil {
log.Print("GOT ERR DMIN", err)
}
currentVersion := engine.CurrentDataDBVersions()
err := accMigrator.dmOut.DataDB().SetVersions(currentVersion, false)
err := accMigrator.dmOut.DataManager().DataDB().SetVersions(currentVersion, false)
if err != nil {
t.Error("Error when setting version for Accounts ", err.Error())
}
@@ -270,14 +263,14 @@ func testAccITMigrateAndMove(t *testing.T) {
if err != nil {
t.Error("Error when accMigratorrating Accounts ", err.Error())
}
result, err := accMigrator.dmOut.DataDB().GetAccount(testAccount.ID)
result, err := accMigrator.dmOut.DataManager().DataDB().GetAccount(testAccount.ID)
if err != nil {
t.Error(err)
}
if !reflect.DeepEqual(testAccount, result) {
t.Errorf("Expecting: %+v, received: %+v", testAccount, result)
}
result, err = accMigrator.dmIN.DataDB().GetAccount(testAccount.ID)
result, err = accMigrator.dmIN.DataManager().DataDB().GetAccount(testAccount.ID)
if err != utils.ErrNotFound {
t.Error(err)
}

View File

@@ -21,7 +21,6 @@ along with this program. If not, see <http://www.gnu.org/licenses/>
package migrator
import (
"log"
"path"
"testing"
"time"
@@ -71,7 +70,7 @@ func TestCdrITMySql(t *testing.T) {
}
func testCdrITConnect(t *testing.T) {
storDBIn, err := engine.ConfigureStorDB(cdrCfgIn.StorDBType, cdrCfgIn.StorDBHost,
storDBIn, err := NewMigratorStorDB(cdrCfgIn.StorDBType, cdrCfgIn.StorDBHost,
cdrCfgIn.StorDBPort, cdrCfgIn.StorDBName,
cdrCfgIn.StorDBUser, cdrCfgIn.StorDBPass,
config.CgrConfig().StorDBMaxOpenConns,
@@ -81,7 +80,7 @@ func testCdrITConnect(t *testing.T) {
if err != nil {
t.Error(err)
}
storDBOut, err := engine.ConfigureStorDB(cdrCfgIn.StorDBType,
storDBOut, err := NewMigratorStorDB(cdrCfgIn.StorDBType,
cdrCfgIn.StorDBHost, cdrCfgIn.StorDBPort, cdrCfgIn.StorDBName,
cdrCfgIn.StorDBUser, cdrCfgIn.StorDBPass,
config.CgrConfig().StorDBMaxOpenConns,
@@ -91,24 +90,17 @@ func testCdrITConnect(t *testing.T) {
if err != nil {
t.Error(err)
}
oldStorDB, err := ConfigureV1StorDB(cdrCfgIn.StorDBType,
cdrCfgIn.StorDBHost, cdrCfgIn.StorDBPort, cdrCfgIn.StorDBName,
cdrCfgIn.StorDBUser, cdrCfgIn.StorDBPass)
if err != nil {
log.Fatal(err)
}
cdrMigrator, err = NewMigrator(nil, nil, cdrCfgIn.DataDbType,
cdrCfgIn.DBDataEncoding, storDBIn, storDBOut, cdrCfgIn.StorDBType, nil,
cdrCfgIn.DataDbType, cdrCfgIn.DBDataEncoding, oldStorDB, cdrCfgIn.StorDBType,
false, false, false, false, false)
cdrMigrator, err = NewMigrator(nil, nil,
storDBIn, storDBOut,
false, false, false)
if err != nil {
t.Error(err)
}
}
func testCdrITFlush(t *testing.T) {
if err := cdrMigrator.storDBOut.Flush(
if err := cdrMigrator.storDBOut.StorDB().Flush(
path.Join(cdrCfgIn.DataFolderPath, "storage", cdrCfgIn.StorDBType)); err != nil {
t.Error(err)
}
@@ -152,18 +144,18 @@ func testCdrITMigrateAndMove(t *testing.T) {
CostDetails: cc,
}
var err error
if err = cdrMigrator.oldStorDB.setV1CDR(v1Cdr); err != nil {
if err = cdrMigrator.storDBIn.setV1CDR(v1Cdr); err != nil {
t.Error(err)
}
currentVersion := engine.Versions{
utils.COST_DETAILS: 2,
utils.CDRs: 1,
}
err = cdrMigrator.storDBOut.SetVersions(currentVersion, false)
err = cdrMigrator.storDBOut.StorDB().SetVersions(currentVersion, false)
if err != nil {
t.Error("Error when setting version for CDRs ", err.Error())
}
if vrs, err := cdrMigrator.storDBOut.GetVersions(""); err != nil {
if vrs, err := cdrMigrator.storDBOut.StorDB().GetVersions(""); err != nil {
t.Error(err)
} else if vrs[utils.CDRs] != 1 {
t.Errorf("Unexpected version returned: %d", vrs[utils.CDRs])
@@ -172,12 +164,12 @@ func testCdrITMigrateAndMove(t *testing.T) {
if err != nil {
t.Error("Error when migrating CDRs ", err.Error())
}
if rcvCDRs, _, err := cdrMigrator.storDBOut.GetCDRs(new(utils.CDRsFilter), false); err != nil {
if rcvCDRs, _, err := cdrMigrator.storDBOut.StorDB().GetCDRs(new(utils.CDRsFilter), false); err != nil {
t.Error(err)
} else if len(rcvCDRs) != 1 {
t.Errorf("Unexpected number of CDRs returned: %d", len(rcvCDRs))
}
if vrs, err := cdrMigrator.storDBOut.GetVersions(""); err != nil {
if vrs, err := cdrMigrator.storDBOut.StorDB().GetVersions(""); err != nil {
t.Error(err)
} else if vrs[utils.CDRs] != 2 {
t.Errorf("Unexpected version returned: %d", vrs[utils.CDRs])

View File

@@ -17,6 +17,7 @@ along with this program. If not, see <http://www.gnu.org/licenses/>
*/
package migrator
/*
import (
"encoding/json"
"testing"
@@ -45,3 +46,4 @@ func TestV1CostDetailsAsCostDetails2(t *testing.T) {
_ = v1CC.AsCallCost()
}
*/

View File

@@ -23,7 +23,6 @@ import (
)
type MigratorDataDB interface {
getKeysForPrefix(prefix string) ([]string, error)
getv1Account() (v1Acnt *v1Account, err error)
setV1Account(x *v1Account) (err error)
getV1ActionPlans() (v1aps *v1ActionPlans, err error)

View File

@@ -20,6 +20,7 @@ along with this program. If not, see <http://www.gnu.org/licenses/>
package migrator
/*
import (
"log"
"path"
@@ -216,3 +217,4 @@ func testSessionCostITMigrateAndMove(t *testing.T) {
t.Errorf("Unexpected version returned: %d", vrs[utils.SessionSCosts])
}
}
*/

View File

@@ -20,6 +20,7 @@ along with this program. If not, see <http://www.gnu.org/licenses/>
package migrator
/*
import (
"log"
"path"
@@ -318,3 +319,4 @@ func testStsITMigrateAndMove(t *testing.T) {
}
}
*/

View File

@@ -26,22 +26,22 @@ import (
)
func (m *Migrator) migrateCurrentTPaccountAcction() (err error) {
tpids, err := m.storDBIn.GetTpIds(utils.TBLTPAccountActions)
tpids, err := m.storDBIn.StorDB().GetTpIds(utils.TBLTPAccountActions)
if err != nil {
return err
}
for _, tpid := range tpids {
accAct, err := m.storDBIn.GetTPAccountActions(&utils.TPAccountActions{TPid: tpid})
accAct, err := m.storDBIn.StorDB().GetTPAccountActions(&utils.TPAccountActions{TPid: tpid})
if err != nil {
return err
}
if accAct != nil {
if m.dryRun != true {
if err := m.storDBOut.SetTPAccountActions(accAct); err != nil {
if err := m.storDBOut.StorDB().SetTPAccountActions(accAct); err != nil {
return err
}
for _, acc := range accAct {
if err := m.storDBIn.RemTpData(utils.TBLTPAccountActions, acc.TPid,
if err := m.storDBIn.StorDB().RemTpData(utils.TBLTPAccountActions, acc.TPid,
map[string]string{"loadid": acc.LoadId, "tenant": acc.Tenant, "account": acc.Account}); err != nil {
return err
}
@@ -56,7 +56,7 @@ func (m *Migrator) migrateCurrentTPaccountAcction() (err error) {
func (m *Migrator) migrateTPaccountacction() (err error) {
var vrs engine.Versions
current := engine.CurrentStorDBVersions()
vrs, err = m.storDBOut.GetVersions("")
vrs, err = m.storDBOut.StorDB().GetVersions("")
if err != nil {
return utils.NewCGRError(utils.Migrator,
utils.ServerErrorCaps,

View File

@@ -20,6 +20,7 @@ along with this program. If not, see <http://www.gnu.org/licenses/>
package migrator
/*
import (
"log"
"path"
@@ -151,3 +152,4 @@ func testTpAccActITCheckData(t *testing.T) {
t.Error(err)
}
}
*/

View File

@@ -26,27 +26,28 @@ import (
)
func (m *Migrator) migrateCurrentTPactionplans() (err error) {
tpids, err := m.storDBIn.GetTpIds(utils.TBLTPActionPlans)
tpids, err := m.storDBIn.StorDB().GetTpIds(utils.TBLTPActionPlans)
if err != nil {
return err
}
for _, tpid := range tpids {
ids, err := m.storDBIn.GetTpTableIds(tpid, utils.TBLTPActionPlans, utils.TPDistinctIds{"tag"}, map[string]string{}, nil)
ids, err := m.storDBIn.StorDB().GetTpTableIds(tpid, utils.TBLTPActionPlans, utils.TPDistinctIds{"tag"}, map[string]string{}, nil)
if err != nil {
return err
}
for _, id := range ids {
actPln, err := m.storDBIn.GetTPActionPlans(tpid, id)
actPln, err := m.storDBIn.StorDB().GetTPActionPlans(tpid, id)
if err != nil {
return err
}
if actPln != nil {
if m.dryRun != true {
if err := m.storDBOut.SetTPActionPlans(actPln); err != nil {
if err := m.storDBOut.StorDB().SetTPActionPlans(actPln); err != nil {
return err
}
for _, act := range actPln {
if err := m.storDBIn.RemTpData(utils.TBLTPActionPlans, act.TPid, map[string]string{"tag": act.ID}); err != nil {
if err := m.storDBIn.StorDB().RemTpData(utils.TBLTPActionPlans,
act.TPid, map[string]string{"tag": act.ID}); err != nil {
return err
}
}
@@ -61,7 +62,7 @@ func (m *Migrator) migrateCurrentTPactionplans() (err error) {
func (m *Migrator) migrateTPactionplans() (err error) {
var vrs engine.Versions
current := engine.CurrentStorDBVersions()
vrs, err = m.storDBOut.GetVersions("")
vrs, err = m.storDBOut.StorDB().GetVersions("")
if err != nil {
return utils.NewCGRError(utils.Migrator,
utils.ServerErrorCaps,

View File

@@ -20,6 +20,7 @@ along with this program. If not, see <http://www.gnu.org/licenses/>
package migrator
/*
import (
"log"
"path"
@@ -159,3 +160,4 @@ func testTpActPlnITCheckData(t *testing.T) {
t.Error(err)
}
}
*/

View File

@@ -26,27 +26,28 @@ import (
)
func (m *Migrator) migrateCurrentTPactiontriggers() (err error) {
tpids, err := m.storDBIn.GetTpIds(utils.TBLTPActionTriggers)
tpids, err := m.storDBIn.StorDB().GetTpIds(utils.TBLTPActionTriggers)
if err != nil {
return err
}
for _, tpid := range tpids {
ids, err := m.storDBIn.GetTpTableIds(tpid, utils.TBLTPActionTriggers, utils.TPDistinctIds{"tag"}, map[string]string{}, nil)
ids, err := m.storDBIn.StorDB().GetTpTableIds(tpid, utils.TBLTPActionTriggers, utils.TPDistinctIds{"tag"}, map[string]string{}, nil)
if err != nil {
return err
}
for _, id := range ids {
actTrg, err := m.storDBIn.GetTPActionTriggers(tpid, id)
actTrg, err := m.storDBIn.StorDB().GetTPActionTriggers(tpid, id)
if err != nil {
return err
}
if actTrg != nil {
if m.dryRun != true {
if err := m.storDBOut.SetTPActionTriggers(actTrg); err != nil {
if err := m.storDBOut.StorDB().SetTPActionTriggers(actTrg); err != nil {
return err
}
for _, act := range actTrg {
if err := m.storDBIn.RemTpData(utils.TBLTPActionTriggers, act.TPid, map[string]string{"tag": act.ID}); err != nil {
if err := m.storDBIn.StorDB().RemTpData(
utils.TBLTPActionTriggers, act.TPid, map[string]string{"tag": act.ID}); err != nil {
return err
}
}
@@ -61,7 +62,7 @@ func (m *Migrator) migrateCurrentTPactiontriggers() (err error) {
func (m *Migrator) migrateTPactiontriggers() (err error) {
var vrs engine.Versions
current := engine.CurrentStorDBVersions()
vrs, err = m.storDBOut.GetVersions("")
vrs, err = m.storDBOut.StorDB().GetVersions("")
if err != nil {
return utils.NewCGRError(utils.Migrator,
utils.ServerErrorCaps,

View File

@@ -20,6 +20,7 @@ along with this program. If not, see <http://www.gnu.org/licenses/>
package migrator
/*
import (
"log"
"path"
@@ -199,3 +200,4 @@ func testTpActTrgITCheckData(t *testing.T) {
t.Error(err)
}
}
*/

View File

@@ -26,27 +26,27 @@ import (
)
func (m *Migrator) migrateCurrentTPactions() (err error) {
tpids, err := m.storDBIn.GetTpIds(utils.TBLTPActions)
tpids, err := m.storDBIn.StorDB().GetTpIds(utils.TBLTPActions)
if err != nil {
return err
}
for _, tpid := range tpids {
ids, err := m.storDBIn.GetTpTableIds(tpid, utils.TBLTPActions, utils.TPDistinctIds{"tag"}, map[string]string{}, nil)
ids, err := m.storDBIn.StorDB().GetTpTableIds(tpid, utils.TBLTPActions, utils.TPDistinctIds{"tag"}, map[string]string{}, nil)
if err != nil {
return err
}
for _, id := range ids {
action, err := m.storDBIn.GetTPActions(tpid, id)
action, err := m.storDBIn.StorDB().GetTPActions(tpid, id)
if err != nil {
return err
}
if action != nil {
if m.dryRun != true {
if err := m.storDBOut.SetTPActions(action); err != nil {
if err := m.storDBOut.StorDB().SetTPActions(action); err != nil {
return err
}
for _, act := range action {
if err := m.storDBIn.RemTpData(utils.TBLTPActions, act.TPid, map[string]string{"tag": act.ID}); err != nil {
if err := m.storDBIn.StorDB().RemTpData(utils.TBLTPActions, act.TPid, map[string]string{"tag": act.ID}); err != nil {
return err
}
}
@@ -61,7 +61,7 @@ func (m *Migrator) migrateCurrentTPactions() (err error) {
func (m *Migrator) migrateTPactions() (err error) {
var vrs engine.Versions
current := engine.CurrentStorDBVersions()
vrs, err = m.storDBOut.GetVersions("")
vrs, err = m.storDBOut.StorDB().GetVersions("")
if err != nil {
return utils.NewCGRError(utils.Migrator,
utils.ServerErrorCaps,

View File

@@ -20,6 +20,7 @@ along with this program. If not, see <http://www.gnu.org/licenses/>
package migrator
/*
import (
"log"
"path"
@@ -189,3 +190,4 @@ func testTpActITCheckData(t *testing.T) {
t.Error(err)
}
}
*/

View File

@@ -26,12 +26,12 @@ import (
)
func (m *Migrator) migrateCurrentTPaliases() (err error) {
tpids, err := m.storDBIn.GetTpIds(utils.TBLTPAliases)
tpids, err := m.storDBIn.StorDB().GetTpIds(utils.TBLTPAliases)
if err != nil {
return err
}
for _, tpid := range tpids {
alias, err := m.storDBIn.GetTPAliases(&utils.TPAliases{TPid: tpid})
alias, err := m.storDBIn.StorDB().GetTPAliases(&utils.TPAliases{TPid: tpid})
if err != nil {
return err
}
@@ -40,11 +40,11 @@ func (m *Migrator) migrateCurrentTPaliases() (err error) {
}
if alias != nil {
if m.dryRun != true {
if err := m.storDBOut.SetTPAliases(alias); err != nil {
if err := m.storDBOut.StorDB().SetTPAliases(alias); err != nil {
return err
}
for _, ali := range alias {
if err := m.storDBIn.RemTpData(utils.TBLTPAliases, ali.TPid, map[string]string{"tag": ali.GetId()}); err != nil {
if err := m.storDBIn.StorDB().RemTpData(utils.TBLTPAliases, ali.TPid, map[string]string{"tag": ali.GetId()}); err != nil {
return err
}
}
@@ -59,7 +59,7 @@ func (m *Migrator) migrateCurrentTPaliases() (err error) {
func (m *Migrator) migrateTPaliases() (err error) {
var vrs engine.Versions
current := engine.CurrentStorDBVersions()
vrs, err = m.storDBOut.GetVersions("")
vrs, err = m.storDBOut.StorDB().GetVersions("")
if err != nil {
return utils.NewCGRError(utils.Migrator,
utils.ServerErrorCaps,

View File

@@ -26,23 +26,23 @@ import (
)
func (m *Migrator) migrateCurrentTPcdrstats() (err error) {
tpids, err := m.storDBIn.GetTpIds(utils.TBLTPCdrStats)
tpids, err := m.storDBIn.StorDB().GetTpIds(utils.TBLTPCdrStats)
if err != nil {
return err
}
for _, tpid := range tpids {
ids, err := m.storDBIn.GetTpTableIds(tpid, utils.TBLTPCdrStats, utils.TPDistinctIds{"tag"}, map[string]string{}, nil)
ids, err := m.storDBIn.StorDB().GetTpTableIds(tpid, utils.TBLTPCdrStats, utils.TPDistinctIds{"tag"}, map[string]string{}, nil)
if err != nil {
return err
}
for _, id := range ids {
dest, err := m.storDBIn.GetTPCdrStats(tpid, id)
dest, err := m.storDBIn.StorDB().GetTPCdrStats(tpid, id)
if err != nil {
return err
}
if dest != nil {
if m.dryRun != true {
if err := m.storDBOut.SetTPCdrStats(dest); err != nil {
if err := m.storDBOut.StorDB().SetTPCdrStats(dest); err != nil {
return err
}
m.stats[utils.TpCdrStats] += 1
@@ -56,7 +56,7 @@ func (m *Migrator) migrateCurrentTPcdrstats() (err error) {
func (m *Migrator) migrateTPcdrstats() (err error) {
var vrs engine.Versions
current := engine.CurrentStorDBVersions()
vrs, err = m.storDBOut.GetVersions("")
vrs, err = m.storDBOut.StorDB().GetVersions("")
if err != nil {
return utils.NewCGRError(utils.Migrator,
utils.ServerErrorCaps,

View File

@@ -26,24 +26,24 @@ import (
)
func (m *Migrator) migrateCurrentTPderivedchargers() (err error) {
tpids, err := m.storDBIn.GetTpIds(utils.TBLTPDerivedChargers)
tpids, err := m.storDBIn.StorDB().GetTpIds(utils.TBLTPDerivedChargers)
if err != nil {
return err
}
for _, tpid := range tpids {
derivedChargers, err := m.storDBIn.GetTPDerivedChargers(&utils.TPDerivedChargers{TPid: tpid})
derivedChargers, err := m.storDBIn.StorDB().GetTPDerivedChargers(&utils.TPDerivedChargers{TPid: tpid})
if err != nil {
return err
}
if derivedChargers != nil {
if m.dryRun != true {
if err := m.storDBOut.SetTPDerivedChargers(derivedChargers); err != nil {
if err := m.storDBOut.StorDB().SetTPDerivedChargers(derivedChargers); err != nil {
return err
}
for _, der := range derivedChargers {
if err := m.storDBIn.RemTpData(utils.TBLTPDerivedChargers,
if err := m.storDBIn.StorDB().RemTpData(utils.TBLTPDerivedChargers,
der.TPid, map[string]string{"loadid": der.LoadId, "direction": der.Direction,
"tenant": der.Tenant, "category": der.Category, "account": der.Account, "subject": der.Subject}); err != nil {
return err
@@ -59,7 +59,7 @@ func (m *Migrator) migrateCurrentTPderivedchargers() (err error) {
func (m *Migrator) migrateTPderivedchargers() (err error) {
var vrs engine.Versions
current := engine.CurrentStorDBVersions()
vrs, err = m.storDBOut.GetVersions("")
vrs, err = m.storDBOut.StorDB().GetVersions("")
if err != nil {
return utils.NewCGRError(utils.Migrator,
utils.ServerErrorCaps,

View File

@@ -20,6 +20,7 @@ along with this program. If not, see <http://www.gnu.org/licenses/>
package migrator
/*
import (
"log"
"path"
@@ -173,3 +174,4 @@ func testTpDrChgITCheckData(t *testing.T) {
t.Error(err)
}
}
*/

View File

@@ -26,27 +26,27 @@ import (
)
func (m *Migrator) migrateCurrentTPdestinationrates() (err error) {
tpids, err := m.storDBIn.GetTpIds(utils.TBLTPDestinationRates)
tpids, err := m.storDBIn.StorDB().GetTpIds(utils.TBLTPDestinationRates)
if err != nil {
return err
}
for _, tpid := range tpids {
ids, err := m.storDBIn.GetTpTableIds(tpid, utils.TBLTPDestinationRates, utils.TPDistinctIds{"tag"}, map[string]string{}, nil)
ids, err := m.storDBIn.StorDB().GetTpTableIds(tpid, utils.TBLTPDestinationRates, utils.TPDistinctIds{"tag"}, map[string]string{}, nil)
if err != nil {
return err
}
for _, id := range ids {
destRate, err := m.storDBIn.GetTPDestinationRates(tpid, id, nil)
destRate, err := m.storDBIn.StorDB().GetTPDestinationRates(tpid, id, nil)
if err != nil {
return err
}
if destRate != nil {
if m.dryRun != true {
if err := m.storDBOut.SetTPDestinationRates(destRate); err != nil {
if err := m.storDBOut.StorDB().SetTPDestinationRates(destRate); err != nil {
return err
}
for _, dest := range destRate {
if err := m.storDBIn.RemTpData(utils.TBLTPDestinationRates, dest.TPid, map[string]string{"tag": dest.ID}); err != nil {
if err := m.storDBIn.StorDB().RemTpData(utils.TBLTPDestinationRates, dest.TPid, map[string]string{"tag": dest.ID}); err != nil {
return err
}
}
@@ -61,7 +61,7 @@ func (m *Migrator) migrateCurrentTPdestinationrates() (err error) {
func (m *Migrator) migrateTPdestinationrates() (err error) {
var vrs engine.Versions
current := engine.CurrentStorDBVersions()
vrs, err = m.storDBOut.GetVersions("")
vrs, err = m.storDBOut.StorDB().GetVersions("")
if err != nil {
return utils.NewCGRError(utils.Migrator,
utils.ServerErrorCaps,

View File

@@ -20,6 +20,7 @@ along with this program. If not, see <http://www.gnu.org/licenses/>
package migrator
/*
import (
"log"
"path"
@@ -154,3 +155,4 @@ func testTpDstRtITCheckData(t *testing.T) {
t.Error(err)
}
}
*/

View File

@@ -26,27 +26,27 @@ import (
)
func (m *Migrator) migrateCurrentTPDestinations() (err error) {
tpids, err := m.storDBIn.GetTpIds(utils.TBLTPDestinations)
tpids, err := m.storDBIn.StorDB().GetTpIds(utils.TBLTPDestinations)
if err != nil {
return err
}
for _, tpid := range tpids {
ids, err := m.storDBIn.GetTpTableIds(tpid, utils.TBLTPDestinations, utils.TPDistinctIds{"tag"}, map[string]string{}, nil)
ids, err := m.storDBIn.StorDB().GetTpTableIds(tpid, utils.TBLTPDestinations, utils.TPDistinctIds{"tag"}, map[string]string{}, nil)
if err != nil {
return err
}
for _, id := range ids {
destinations, err := m.storDBIn.GetTPDestinations(tpid, id)
destinations, err := m.storDBIn.StorDB().GetTPDestinations(tpid, id)
if err != nil {
return err
}
if destinations != nil {
if m.dryRun != true {
if err := m.storDBOut.SetTPDestinations(destinations); err != nil {
if err := m.storDBOut.StorDB().SetTPDestinations(destinations); err != nil {
return err
}
for _, dest := range destinations {
if err := m.storDBIn.RemTpData(utils.TBLTPDestinations, dest.TPid, map[string]string{"tag": dest.ID}); err != nil {
if err := m.storDBIn.StorDB().RemTpData(utils.TBLTPDestinations, dest.TPid, map[string]string{"tag": dest.ID}); err != nil {
return err
}
}
@@ -61,7 +61,7 @@ func (m *Migrator) migrateCurrentTPDestinations() (err error) {
func (m *Migrator) migrateTPDestinations() (err error) {
var vrs engine.Versions
current := engine.CurrentStorDBVersions()
vrs, err = m.storDBOut.GetVersions("")
vrs, err = m.storDBOut.StorDB().GetVersions("")
if err != nil {
return utils.NewCGRError(utils.Migrator,
utils.ServerErrorCaps,

View File

@@ -20,6 +20,7 @@ along with this program. If not, see <http://www.gnu.org/licenses/>
package migrator
/*
import (
"log"
"path"
@@ -148,3 +149,4 @@ func testTpDstITCheckData(t *testing.T) {
t.Error(err)
}
}
*/

View File

@@ -26,28 +26,28 @@ import (
)
func (m *Migrator) migrateCurrentTPfilters() (err error) {
tpids, err := m.storDBIn.GetTpIds(utils.TBLTPFilters)
tpids, err := m.storDBIn.StorDB().GetTpIds(utils.TBLTPFilters)
if err != nil {
return err
}
for _, tpid := range tpids {
ids, err := m.storDBIn.GetTpTableIds(tpid, utils.TBLTPFilters,
ids, err := m.storDBIn.StorDB().GetTpTableIds(tpid, utils.TBLTPFilters,
utils.TPDistinctIds{"id"}, map[string]string{}, nil)
if err != nil {
return err
}
for _, id := range ids {
fltrs, err := m.storDBIn.GetTPFilters(tpid, id)
fltrs, err := m.storDBIn.StorDB().GetTPFilters(tpid, id)
if err != nil {
return err
}
if fltrs != nil {
if m.dryRun != true {
if err := m.storDBOut.SetTPFilters(fltrs); err != nil {
if err := m.storDBOut.StorDB().SetTPFilters(fltrs); err != nil {
return err
}
for _, fltr := range fltrs {
if err := m.storDBIn.RemTpData(utils.TBLTPFilters,
if err := m.storDBIn.StorDB().RemTpData(utils.TBLTPFilters,
fltr.TPid, map[string]string{"tenant": fltr.Tenant, "id": fltr.ID}); err != nil {
return err
}
@@ -63,7 +63,7 @@ func (m *Migrator) migrateCurrentTPfilters() (err error) {
func (m *Migrator) migrateTPfilters() (err error) {
var vrs engine.Versions
current := engine.CurrentStorDBVersions()
vrs, err = m.storDBOut.GetVersions("")
vrs, err = m.storDBOut.StorDB().GetVersions("")
if err != nil {
return utils.NewCGRError(utils.Migrator,
utils.ServerErrorCaps,

View File

@@ -20,6 +20,7 @@ along with this program. If not, see <http://www.gnu.org/licenses/>
package migrator
/*
import (
"log"
"path"
@@ -158,3 +159,4 @@ func testTpFltrITCheckData(t *testing.T) {
t.Error(err)
}
}
*/

View File

@@ -26,24 +26,24 @@ import (
)
func (m *Migrator) migrateCurrentTPrates() (err error) {
tpids, err := m.storDBIn.GetTpIds(utils.TBLTPRates)
tpids, err := m.storDBIn.StorDB().GetTpIds(utils.TBLTPRates)
if err != nil {
return err
}
for _, tpid := range tpids {
ids, err := m.storDBIn.GetTpTableIds(tpid, utils.TBLTPRates, utils.TPDistinctIds{"tag"}, map[string]string{}, nil)
ids, err := m.storDBIn.StorDB().GetTpTableIds(tpid, utils.TBLTPRates, utils.TPDistinctIds{"tag"}, map[string]string{}, nil)
if err != nil {
return err
}
for _, id := range ids {
dest, err := m.storDBIn.GetTPRates(tpid, id)
dest, err := m.storDBIn.StorDB().GetTPRates(tpid, id)
if err != nil {
return err
}
if dest != nil {
if m.dryRun != true {
if err := m.storDBOut.SetTPRates(dest); err != nil {
if err := m.storDBOut.StorDB().SetTPRates(dest); err != nil {
return err
}
m.stats[utils.TpRates] += 1
@@ -57,7 +57,7 @@ func (m *Migrator) migrateCurrentTPrates() (err error) {
func (m *Migrator) migrateTPrates() (err error) {
var vrs engine.Versions
current := engine.CurrentStorDBVersions()
vrs, err = m.storDBOut.GetVersions("")
vrs, err = m.storDBOut.StorDB().GetVersions("")
if err != nil {
return utils.NewCGRError(utils.Migrator,
utils.ServerErrorCaps,

View File

@@ -26,24 +26,24 @@ import (
)
func (m *Migrator) migrateCurrentTPratingplans() (err error) {
tpids, err := m.storDBIn.GetTpIds(utils.TBLTPRatingPlans)
tpids, err := m.storDBIn.StorDB().GetTpIds(utils.TBLTPRatingPlans)
if err != nil {
return err
}
for _, tpid := range tpids {
ids, err := m.storDBIn.GetTpTableIds(tpid, utils.TBLTPRatingPlans, utils.TPDistinctIds{"tag"}, map[string]string{}, nil)
ids, err := m.storDBIn.StorDB().GetTpTableIds(tpid, utils.TBLTPRatingPlans, utils.TPDistinctIds{"tag"}, map[string]string{}, nil)
if err != nil {
return err
}
if len(ids) != 0 {
for _, id := range ids {
rps, err := m.storDBIn.GetTPRatingPlans(tpid, id, nil)
rps, err := m.storDBIn.StorDB().GetTPRatingPlans(tpid, id, nil)
if err != nil {
return err
}
if rps != nil {
if m.dryRun != true {
if err := m.storDBOut.SetTPRatingPlans(rps); err != nil {
if err := m.storDBOut.StorDB().SetTPRatingPlans(rps); err != nil {
return err
}
m.stats[utils.TpRatingPlans] += 1
@@ -58,7 +58,7 @@ func (m *Migrator) migrateCurrentTPratingplans() (err error) {
func (m *Migrator) migrateTPratingplans() (err error) {
var vrs engine.Versions
current := engine.CurrentStorDBVersions()
vrs, err = m.storDBOut.GetVersions("")
vrs, err = m.storDBOut.StorDB().GetVersions("")
if err != nil {
return utils.NewCGRError(utils.Migrator,
utils.ServerErrorCaps,

View File

@@ -27,19 +27,19 @@ import (
func (m *Migrator) migrateCurrentTPratingprofiles() (err error) {
tpids, err := m.storDBIn.GetTpIds(utils.TBLTPRateProfiles)
tpids, err := m.storDBIn.StorDB().GetTpIds(utils.TBLTPRateProfiles)
if err != nil {
return err
}
for _, tpid := range tpids {
dest, err := m.storDBIn.GetTPRatingProfiles(&utils.TPRatingProfile{TPid: tpid})
dest, err := m.storDBIn.StorDB().GetTPRatingProfiles(&utils.TPRatingProfile{TPid: tpid})
if err != nil {
return err
}
if dest != nil {
if m.dryRun != true {
if err := m.storDBOut.SetTPRatingProfiles(dest); err != nil {
if err := m.storDBOut.StorDB().SetTPRatingProfiles(dest); err != nil {
return err
}
m.stats[utils.TpRatingProfiles] += 1
@@ -52,7 +52,7 @@ func (m *Migrator) migrateCurrentTPratingprofiles() (err error) {
func (m *Migrator) migrateTPratingprofiles() (err error) {
var vrs engine.Versions
current := engine.CurrentStorDBVersions()
vrs, err = m.storDBOut.GetVersions("")
vrs, err = m.storDBOut.StorDB().GetVersions("")
if err != nil {
return utils.NewCGRError(utils.Migrator,
utils.ServerErrorCaps,

View File

@@ -26,30 +26,30 @@ import (
)
func (m *Migrator) migrateCurrentTPresources() (err error) {
tpids, err := m.storDBIn.GetTpIds(utils.TBLTPResources)
tpids, err := m.storDBIn.StorDB().GetTpIds(utils.TBLTPResources)
if err != nil {
return err
}
for _, tpid := range tpids {
ids, err := m.storDBIn.GetTpTableIds(tpid, utils.TBLTPResources,
ids, err := m.storDBIn.StorDB().GetTpTableIds(tpid, utils.TBLTPResources,
utils.TPDistinctIds{"id"}, map[string]string{}, nil)
if err != nil {
return err
}
for _, id := range ids {
resources, err := m.storDBIn.GetTPResources(tpid, id)
resources, err := m.storDBIn.StorDB().GetTPResources(tpid, id)
if err != nil {
return err
}
if resources != nil {
if m.dryRun != true {
if err := m.storDBOut.SetTPResources(resources); err != nil {
if err := m.storDBOut.StorDB().SetTPResources(resources); err != nil {
return err
}
for _, resource := range resources {
if err := m.storDBIn.RemTpData(utils.TBLTPResources, resource.TPid,
if err := m.storDBIn.StorDB().RemTpData(utils.TBLTPResources, resource.TPid,
map[string]string{"id": resource.ID}); err != nil {
return err
}
@@ -65,7 +65,7 @@ func (m *Migrator) migrateCurrentTPresources() (err error) {
func (m *Migrator) migrateTPresources() (err error) {
var vrs engine.Versions
current := engine.CurrentStorDBVersions()
vrs, err = m.storDBOut.GetVersions("")
vrs, err = m.storDBOut.StorDB().GetVersions("")
if err != nil {
return utils.NewCGRError(utils.Migrator,
utils.ServerErrorCaps,

View File

@@ -20,6 +20,7 @@ along with this program. If not, see <http://www.gnu.org/licenses/>
package migrator
/*
import (
"log"
"path"
@@ -159,3 +160,4 @@ func testTpResITCheckData(t *testing.T) {
t.Error(err)
}
}
*/

View File

@@ -26,25 +26,25 @@ import (
)
func (m *Migrator) migrateCurrentTPsharedgroups() (err error) {
tpids, err := m.storDBIn.GetTpIds(utils.TBLTPSharedGroups)
tpids, err := m.storDBIn.StorDB().GetTpIds(utils.TBLTPSharedGroups)
if err != nil {
return err
}
for _, tpid := range tpids {
ids, err := m.storDBIn.GetTpTableIds(tpid, utils.TBLTPSharedGroups, utils.TPDistinctIds{"tag"}, map[string]string{}, nil)
ids, err := m.storDBIn.StorDB().GetTpTableIds(tpid, utils.TBLTPSharedGroups, utils.TPDistinctIds{"tag"}, map[string]string{}, nil)
if err != nil {
return err
}
for _, id := range ids {
dest, err := m.storDBIn.GetTPSharedGroups(tpid, id)
dest, err := m.storDBIn.StorDB().GetTPSharedGroups(tpid, id)
if err != nil {
return err
}
if dest != nil {
if m.dryRun != true {
if err := m.storDBOut.SetTPSharedGroups(dest); err != nil {
if err := m.storDBOut.StorDB().SetTPSharedGroups(dest); err != nil {
return err
}
m.stats[utils.TpSharedGroups] += 1
@@ -58,7 +58,7 @@ func (m *Migrator) migrateCurrentTPsharedgroups() (err error) {
func (m *Migrator) migrateTPsharedgroups() (err error) {
var vrs engine.Versions
current := engine.CurrentStorDBVersions()
vrs, err = m.storDBOut.GetVersions("")
vrs, err = m.storDBOut.StorDB().GetVersions("")
if err != nil {
return utils.NewCGRError(utils.Migrator,
utils.ServerErrorCaps,

View File

@@ -26,30 +26,30 @@ import (
)
func (m *Migrator) migrateCurrentTPstats() (err error) {
tpids, err := m.storDBIn.GetTpIds(utils.TBLTPStats)
tpids, err := m.storDBIn.StorDB().GetTpIds(utils.TBLTPStats)
if err != nil {
return err
}
for _, tpid := range tpids {
ids, err := m.storDBIn.GetTpTableIds(tpid, utils.TBLTPStats,
ids, err := m.storDBIn.StorDB().GetTpTableIds(tpid, utils.TBLTPStats,
utils.TPDistinctIds{"id"}, map[string]string{}, nil)
if err != nil {
return err
}
for _, id := range ids {
stats, err := m.storDBIn.GetTPStats(tpid, id)
stats, err := m.storDBIn.StorDB().GetTPStats(tpid, id)
if err != nil {
return err
}
if stats != nil {
if m.dryRun != true {
if err := m.storDBOut.SetTPStats(stats); err != nil {
if err := m.storDBOut.StorDB().SetTPStats(stats); err != nil {
return err
}
for _, stat := range stats {
if err := m.storDBIn.RemTpData(utils.TBLTPStats, stat.TPid,
if err := m.storDBIn.StorDB().RemTpData(utils.TBLTPStats, stat.TPid,
map[string]string{"id": stat.ID}); err != nil {
return err
}
@@ -65,7 +65,7 @@ func (m *Migrator) migrateCurrentTPstats() (err error) {
func (m *Migrator) migrateTPstats() (err error) {
var vrs engine.Versions
current := engine.CurrentStorDBVersions()
vrs, err = m.storDBOut.GetVersions("")
vrs, err = m.storDBOut.StorDB().GetVersions("")
if err != nil {
return utils.NewCGRError(utils.Migrator,
utils.ServerErrorCaps,

View File

@@ -20,6 +20,7 @@ along with this program. If not, see <http://www.gnu.org/licenses/>
package migrator
/*
import (
"log"
"path"
@@ -165,3 +166,4 @@ func testTpStatsITCheckData(t *testing.T) {
t.Error(err)
}
}
*/

View File

@@ -26,30 +26,30 @@ import (
)
func (m *Migrator) migrateCurrentTPSuppliers() (err error) {
tpids, err := m.storDBIn.GetTpIds(utils.TBLTPSuppliers)
tpids, err := m.storDBIn.StorDB().GetTpIds(utils.TBLTPSuppliers)
if err != nil {
return err
}
for _, tpid := range tpids {
ids, err := m.storDBIn.GetTpTableIds(tpid, utils.TBLTPSuppliers,
ids, err := m.storDBIn.StorDB().GetTpTableIds(tpid, utils.TBLTPSuppliers,
utils.TPDistinctIds{"id"}, map[string]string{}, nil)
if err != nil {
return err
}
for _, id := range ids {
suppliers, err := m.storDBIn.GetTPSuppliers(tpid, id)
suppliers, err := m.storDBIn.StorDB().GetTPSuppliers(tpid, id)
if err != nil {
return err
}
if suppliers != nil {
if m.dryRun != true {
if err := m.storDBOut.SetTPSuppliers(suppliers); err != nil {
if err := m.storDBOut.StorDB().SetTPSuppliers(suppliers); err != nil {
return err
}
for _, supplier := range suppliers {
if err := m.storDBIn.RemTpData(utils.TBLTPSuppliers, supplier.TPid,
if err := m.storDBIn.StorDB().RemTpData(utils.TBLTPSuppliers, supplier.TPid,
map[string]string{"tenant": supplier.Tenant, "id": supplier.ID}); err != nil {
return err
}
@@ -66,7 +66,7 @@ func (m *Migrator) migrateCurrentTPSuppliers() (err error) {
func (m *Migrator) migrateTPSuppliers() (err error) {
var vrs engine.Versions
current := engine.CurrentStorDBVersions()
vrs, err = m.storDBOut.GetVersions("")
vrs, err = m.storDBOut.StorDB().GetVersions("")
if err != nil {
return utils.NewCGRError(utils.Migrator,
utils.ServerErrorCaps,

View File

@@ -20,6 +20,7 @@ along with this program. If not, see <http://www.gnu.org/licenses/>
package migrator
/*
import (
"log"
"path"
@@ -168,3 +169,4 @@ func testTpSplITCheckData(t *testing.T) {
t.Error(err)
}
}
*/

View File

@@ -26,30 +26,30 @@ import (
)
func (m *Migrator) migrateCurrentTPthresholds() (err error) {
tpids, err := m.storDBIn.GetTpIds(utils.TBLTPThresholds)
tpids, err := m.storDBIn.StorDB().GetTpIds(utils.TBLTPThresholds)
if err != nil {
return err
}
for _, tpid := range tpids {
ids, err := m.storDBIn.GetTpTableIds(tpid, utils.TBLTPThresholds,
ids, err := m.storDBIn.StorDB().GetTpTableIds(tpid, utils.TBLTPThresholds,
utils.TPDistinctIds{"id"}, map[string]string{}, nil)
if err != nil {
return err
}
for _, id := range ids {
thresholds, err := m.storDBIn.GetTPThresholds(tpid, id)
thresholds, err := m.storDBIn.StorDB().GetTPThresholds(tpid, id)
if err != nil {
return err
}
if thresholds != nil {
if m.dryRun != true {
if err := m.storDBOut.SetTPThresholds(thresholds); err != nil {
if err := m.storDBOut.StorDB().SetTPThresholds(thresholds); err != nil {
return err
}
for _, threshold := range thresholds {
if err := m.storDBIn.RemTpData(utils.TBLTPThresholds, threshold.TPid,
if err := m.storDBIn.StorDB().RemTpData(utils.TBLTPThresholds, threshold.TPid,
map[string]string{"tenant": threshold.Tenant, "id": threshold.ID}); err != nil {
return err
}
@@ -65,7 +65,7 @@ func (m *Migrator) migrateCurrentTPthresholds() (err error) {
func (m *Migrator) migrateTPthresholds() (err error) {
var vrs engine.Versions
current := engine.CurrentStorDBVersions()
vrs, err = m.storDBOut.GetVersions("")
vrs, err = m.storDBOut.StorDB().GetVersions("")
if err != nil {
return utils.NewCGRError(utils.Migrator,
utils.ServerErrorCaps,

View File

@@ -20,6 +20,7 @@ along with this program. If not, see <http://www.gnu.org/licenses/>
package migrator
/*
import (
"log"
"path"
@@ -158,3 +159,4 @@ func testTpTresITCheckData(t *testing.T) {
t.Error(err)
}
}
*/

View File

@@ -26,29 +26,29 @@ import (
)
func (m *Migrator) migrateCurrentTPTiming() (err error) {
tpids, err := m.storDBIn.GetTpIds(utils.TBLTPTimings)
tpids, err := m.storDBIn.StorDB().GetTpIds(utils.TBLTPTimings)
if err != nil {
return err
}
for _, tpid := range tpids {
ids, err := m.storDBIn.GetTpTableIds(tpid, utils.TBLTPTimings,
ids, err := m.storDBIn.StorDB().GetTpTableIds(tpid, utils.TBLTPTimings,
utils.TPDistinctIds{"tag"}, map[string]string{}, nil)
if err != nil {
return err
}
for _, id := range ids {
tm, err := m.storDBIn.GetTPTimings(tpid, id)
tm, err := m.storDBIn.StorDB().GetTPTimings(tpid, id)
if err != nil {
return err
}
if tm != nil {
if m.dryRun != true {
if err := m.storDBOut.SetTPTimings(tm); err != nil {
if err := m.storDBOut.StorDB().SetTPTimings(tm); err != nil {
return err
}
for _, timing := range tm {
if err := m.storDBIn.RemTpData(utils.TBLTPTimings,
if err := m.storDBIn.StorDB().RemTpData(utils.TBLTPTimings,
timing.TPid, map[string]string{"tag": timing.ID}); err != nil {
return err
}
@@ -64,7 +64,7 @@ func (m *Migrator) migrateCurrentTPTiming() (err error) {
func (m *Migrator) migrateTpTimings() (err error) {
var vrs engine.Versions
current := engine.CurrentStorDBVersions()
vrs, err = m.storDBOut.GetVersions("")
vrs, err = m.storDBOut.StorDB().GetVersions("")
if err != nil {
return utils.NewCGRError(utils.Migrator,
utils.ServerErrorCaps,

View File

@@ -20,6 +20,7 @@ along with this program. If not, see <http://www.gnu.org/licenses/>
package migrator
/*
import (
"log"
"path"
@@ -151,3 +152,4 @@ func testTpTimITCheckData(t *testing.T) {
t.Error(err)
}
}
*/

View File

@@ -26,23 +26,23 @@ import (
)
func (m *Migrator) migrateCurrentTPusers() (err error) {
tpids, err := m.storDBIn.GetTpIds(utils.TBLTPUsers)
tpids, err := m.storDBIn.StorDB().GetTpIds(utils.TBLTPUsers)
if err != nil {
return err
}
for _, tpid := range tpids {
users, err := m.storDBIn.GetTPUsers(&utils.TPUsers{TPid: tpid})
users, err := m.storDBIn.StorDB().GetTPUsers(&utils.TPUsers{TPid: tpid})
if err != nil {
return err
}
if users != nil {
if m.dryRun != true {
if err := m.storDBOut.SetTPUsers(users); err != nil {
if err := m.storDBOut.StorDB().SetTPUsers(users); err != nil {
return err
}
for _, user := range users {
if err := m.storDBIn.RemTpData(utils.TBLTPUsers, user.TPid,
if err := m.storDBIn.StorDB().RemTpData(utils.TBLTPUsers, user.TPid,
map[string]string{"tenant": user.Tenant, "user_name": user.UserName}); err != nil {
return err
}
@@ -58,7 +58,7 @@ func (m *Migrator) migrateCurrentTPusers() (err error) {
func (m *Migrator) migrateTPusers() (err error) {
var vrs engine.Versions
current := engine.CurrentStorDBVersions()
vrs, err = m.storDBOut.GetVersions("")
vrs, err = m.storDBOut.StorDB().GetVersions("")
if err != nil {
return utils.NewCGRError(utils.Migrator,
utils.ServerErrorCaps,

View File

@@ -28,19 +28,19 @@ import (
func (m *Migrator) migrateCurrentUser() (err error) {
var ids []string
ids, err = m.dmIN.DataDB().GetKeysForPrefix(utils.USERS_PREFIX)
ids, err = m.dmIN.DataManager().DataDB().GetKeysForPrefix(utils.USERS_PREFIX)
if err != nil {
return err
}
for _, id := range ids {
idg := strings.TrimPrefix(id, utils.USERS_PREFIX)
usr, err := m.dmIN.GetUser(idg)
usr, err := m.dmIN.DataManager().GetUser(idg)
if err != nil {
return err
}
if usr != nil {
if m.dryRun != true {
if err := m.dmOut.SetUser(usr); err != nil {
if err := m.dmOut.DataManager().SetUser(usr); err != nil {
return err
}
m.stats[utils.User] += 1
@@ -53,7 +53,7 @@ func (m *Migrator) migrateCurrentUser() (err error) {
func (m *Migrator) migrateUser() (err error) {
var vrs engine.Versions
current := engine.CurrentDataDBVersions()
vrs, err = m.dmOut.DataDB().GetVersions("")
vrs, err = m.dmOut.DataManager().DataDB().GetVersions("")
if err != nil {
return utils.NewCGRError(utils.Migrator,
utils.ServerErrorCaps,

View File

@@ -20,8 +20,10 @@ package migrator
import (
"errors"
"fmt"
"github.com/cgrates/cgrates/config"
"github.com/cgrates/cgrates/engine"
"github.com/cgrates/cgrates/utils"
)
@@ -30,45 +32,44 @@ func NewMigratorDataDB(db_type, host, port, name, user, pass, marshaler string,
dm, err := engine.ConfigureDataStorage(db_type,
host, port, name, user, pass, marshaler,
cacheCfg, loadHistorySize)
var d MigratorDataDB
if err != nil {
return nil, err
}
switch db_type {
case utils.REDIS:
d := newRedisMigrator(dm)
db = d.(MigratorDataDB)
d = newRedisMigrator(dm)
case utils.MONGO:
d := newMongoMigrator(dm)
d = newMongoMigrator(dm)
db = d.(MigratorDataDB)
default:
err = errors.New(fmt.Sprintf("Unknown db '%s' valid options are '%s' or '%s'",
db_type, utils.REDIS, utils.MONGO))
}
return d, nil
}
/*
func NewMigratorStorDB(db_type, host, port, name, user, pass, marshaler string,
cacheCfg config.CacheConfig, loadHistorySize int) (db MigratorDataDB, err error) {
dm, err := engine.ConfigureStorStorage(db_type,
host, port, name, user, pass, marshaler,
cacheCfg, loadHistorySize)
func NewMigratorStorDB(db_type, host, port, name, user, pass string,
maxConn, maxIdleConn, connMaxLifetime int, cdrsIndexes []string) (db MigratorStorDB, err error) {
var d MigratorStorDB
storDb, err := engine.ConfigureStorDB(db_type, host, port, name, user, pass,
maxConn, maxIdleConn, connMaxLifetime, cdrsIndexes)
if err != nil {
return nil, err
}
switch db_type {
case utils.MONGO:
d := newRedisMigrator(dm)
db = d.(MigratorDataDB)
d = newMongoStorDBMigrator(storDb)
db = d.(MigratorStorDB)
case utils.MYSQL:
d := newMongoMigrator(dm)
db = d.(MigratorDataDB)
d = newMigratorSQL(storDb)
db = d.(MigratorStorDB)
default:
err = errors.New(fmt.Sprintf("Unknown db '%s' valid options are '%s' or '%s'",
db_type, utils.REDIS, utils.MONGO))
db_type, utils.MONGO, utils.MYSQL))
}
return d, nil
}
*/
/*

View File

@@ -46,8 +46,15 @@ type AtKeyValue struct {
}
func newMongoMigrator(dm *engine.DataManager) (mgoMig *mongoMigrator) {
mgoMig.dm = dm
mgoMig.mgoDB = dm.DataDB().(*engine.MongoStorage)
return &mongoMigrator{
dm: dm,
mgoDB: dm.DataDB().(*engine.MongoStorage),
qryIter: nil,
}
}
func (mgoMig *mongoMigrator) DataManager() *engine.DataManager {
return mgoMig.dm
}
//Account methods
@@ -55,7 +62,7 @@ func newMongoMigrator(dm *engine.DataManager) (mgoMig *mongoMigrator) {
//get
func (v1ms *mongoMigrator) getv1Account() (v1Acnt *v1Account, err error) {
if v1ms.qryIter == nil {
v1ms.qryIter = v1ms.session.DB(v1ms.db).C(v1AccountDBPrefix).Find(nil).Iter()
v1ms.qryIter = v1ms.mgoDB.DB().C(v1AccountDBPrefix).Find(nil).Iter()
}
v1ms.qryIter.Next(&v1Acnt)
@@ -69,7 +76,7 @@ func (v1ms *mongoMigrator) getv1Account() (v1Acnt *v1Account, err error) {
//set
func (v1ms *mongoMigrator) setV1Account(x *v1Account) (err error) {
if err := v1ms.session.DB(v1ms.db).C(v1AccountDBPrefix).Insert(x); err != nil {
if err := v1ms.mgoDB.DB().C(v1AccountDBPrefix).Insert(x); err != nil {
return err
}
return
@@ -79,7 +86,7 @@ func (v1ms *mongoMigrator) setV1Account(x *v1Account) (err error) {
//get
func (v1ms *mongoMigrator) getv2Account() (v2Acnt *v2Account, err error) {
if v1ms.qryIter == nil {
v1ms.qryIter = v1ms.session.DB(v1ms.db).C(v2AccountsCol).Find(nil).Iter()
v1ms.qryIter = v1ms.mgoDB.DB().C(v2AccountsCol).Find(nil).Iter()
}
v1ms.qryIter.Next(&v2Acnt)
@@ -93,7 +100,7 @@ func (v1ms *mongoMigrator) getv2Account() (v2Acnt *v2Account, err error) {
//set
func (v1ms *mongoMigrator) setV2Account(x *v2Account) (err error) {
if err := v1ms.session.DB(v1ms.db).C(v2AccountsCol).Insert(x); err != nil {
if err := v1ms.mgoDB.DB().C(v2AccountsCol).Insert(x); err != nil {
return err
}
return
@@ -104,7 +111,7 @@ func (v1ms *mongoMigrator) setV2Account(x *v2Account) (err error) {
func (v1ms *mongoMigrator) getV1ActionPlans() (v1aps *v1ActionPlans, err error) {
var strct *AtKeyValue
if v1ms.qryIter == nil {
v1ms.qryIter = v1ms.session.DB(v1ms.db).C("actiontimings").Find(nil).Iter()
v1ms.qryIter = v1ms.mgoDB.DB().C("actiontimings").Find(nil).Iter()
}
v1ms.qryIter.Next(&strct)
if strct == nil {
@@ -118,7 +125,7 @@ func (v1ms *mongoMigrator) getV1ActionPlans() (v1aps *v1ActionPlans, err error)
//set
func (v1ms *mongoMigrator) setV1ActionPlans(x *v1ActionPlans) (err error) {
key := utils.ACTION_PLAN_PREFIX + (*x)[0].Id
if err := v1ms.session.DB(v1ms.db).C("actiontimings").Insert(&AtKeyValue{key, *x}); err != nil {
if err := v1ms.mgoDB.DB().C("actiontimings").Insert(&AtKeyValue{key, *x}); err != nil {
return err
}
return
@@ -129,7 +136,7 @@ func (v1ms *mongoMigrator) setV1ActionPlans(x *v1ActionPlans) (err error) {
func (v1ms *mongoMigrator) getV1Actions() (v1acs *v1Actions, err error) {
var strct *AcKeyValue
if v1ms.qryIter == nil {
v1ms.qryIter = v1ms.session.DB(v1ms.db).C("actions").Find(nil).Iter()
v1ms.qryIter = v1ms.mgoDB.DB().C("actions").Find(nil).Iter()
}
v1ms.qryIter.Next(&strct)
if strct == nil {
@@ -144,7 +151,7 @@ func (v1ms *mongoMigrator) getV1Actions() (v1acs *v1Actions, err error) {
//set
func (v1ms *mongoMigrator) setV1Actions(x *v1Actions) (err error) {
key := utils.ACTION_PREFIX + (*x)[0].Id
if err := v1ms.session.DB(v1ms.db).C("actions").Insert(&AcKeyValue{key, *x}); err != nil {
if err := v1ms.mgoDB.DB().C("actions").Insert(&AcKeyValue{key, *x}); err != nil {
return err
}
return
@@ -165,7 +172,7 @@ func (v1ms *mongoMigrator) setV1ActionTriggers(x *v1ActionTriggers) (err error)
//get
func (v1ms *mongoMigrator) getV1SharedGroup() (v1sg *v1SharedGroup, err error) {
if v1ms.qryIter == nil {
v1ms.qryIter = v1ms.session.DB(v1ms.db).C(utils.SHARED_GROUP_PREFIX).Find(nil).Iter()
v1ms.qryIter = v1ms.mgoDB.DB().C(utils.SHARED_GROUP_PREFIX).Find(nil).Iter()
}
v1ms.qryIter.Next(&v1sg)
if v1sg == nil {
@@ -178,7 +185,7 @@ func (v1ms *mongoMigrator) getV1SharedGroup() (v1sg *v1SharedGroup, err error) {
//set
func (v1ms *mongoMigrator) setV1SharedGroup(x *v1SharedGroup) (err error) {
if err := v1ms.session.DB(v1ms.db).C(utils.SHARED_GROUP_PREFIX).Insert(x); err != nil {
if err := v1ms.mgoDB.DB().C(utils.SHARED_GROUP_PREFIX).Insert(x); err != nil {
return err
}
return
@@ -188,7 +195,7 @@ func (v1ms *mongoMigrator) setV1SharedGroup(x *v1SharedGroup) (err error) {
//get
func (v1ms *mongoMigrator) getV1Stats() (v1st *v1Stat, err error) {
if v1ms.qryIter == nil {
v1ms.qryIter = v1ms.session.DB(v1ms.db).C(utils.CDR_STATS_PREFIX).Find(nil).Iter()
v1ms.qryIter = v1ms.mgoDB.DB().C(utils.CDR_STATS_PREFIX).Find(nil).Iter()
}
v1ms.qryIter.Next(&v1st)
if v1st == nil {
@@ -201,7 +208,7 @@ func (v1ms *mongoMigrator) getV1Stats() (v1st *v1Stat, err error) {
//set
func (v1ms *mongoMigrator) setV1Stats(x *v1Stat) (err error) {
if err := v1ms.session.DB(v1ms.db).C(utils.CDR_STATS_PREFIX).Insert(x); err != nil {
if err := v1ms.mgoDB.DB().C(utils.CDR_STATS_PREFIX).Insert(x); err != nil {
return err
}
return
@@ -211,7 +218,7 @@ func (v1ms *mongoMigrator) setV1Stats(x *v1Stat) (err error) {
//get
func (v1ms *mongoMigrator) getV2ActionTrigger() (v2at *v2ActionTrigger, err error) {
if v1ms.qryIter == nil {
v1ms.qryIter = v1ms.session.DB(v1ms.db).C(v1ActionTriggersCol).Find(nil).Iter()
v1ms.qryIter = v1ms.mgoDB.DB().C(v1ActionTriggersCol).Find(nil).Iter()
}
v1ms.qryIter.Next(&v2at)
if v2at == nil {
@@ -224,7 +231,7 @@ func (v1ms *mongoMigrator) getV2ActionTrigger() (v2at *v2ActionTrigger, err erro
//set
func (v1ms *mongoMigrator) setV2ActionTrigger(x *v2ActionTrigger) (err error) {
if err := v1ms.session.DB(v1ms.db).C(v1ActionTriggersCol).Insert(x); err != nil {
if err := v1ms.mgoDB.DB().C(v1ActionTriggersCol).Insert(x); err != nil {
return err
}
return
@@ -234,7 +241,7 @@ func (v1ms *mongoMigrator) setV2ActionTrigger(x *v2ActionTrigger) (err error) {
//get
func (v1ms *mongoMigrator) getV1AttributeProfile() (v1attrPrf *v1AttributeProfile, err error) {
if v1ms.qryIter == nil {
v1ms.qryIter = v1ms.session.DB(v1ms.db).C(v1AttributeProfilesCol).Find(nil).Iter()
v1ms.qryIter = v1ms.mgoDB.DB().C(v1AttributeProfilesCol).Find(nil).Iter()
}
v1ms.qryIter.Next(&v1attrPrf)
if v1attrPrf == nil {
@@ -247,7 +254,7 @@ func (v1ms *mongoMigrator) getV1AttributeProfile() (v1attrPrf *v1AttributeProfil
//set
func (v1ms *mongoMigrator) setV1AttributeProfile(x *v1AttributeProfile) (err error) {
if err := v1ms.session.DB(v1ms.db).C(v1AttributeProfilesCol).Insert(x); err != nil {
if err := v1ms.mgoDB.DB().C(v1AttributeProfilesCol).Insert(x); err != nil {
return err
}
return

View File

@@ -21,13 +21,32 @@ package migrator
import (
"github.com/cgrates/cgrates/engine"
"github.com/cgrates/cgrates/utils"
"github.com/cgrates/mgo"
)
func newMongoStorDBMigrator(stor engine.StorDB) (mgoMig *mongoStorDBMigrator) {
return &mongoStorDBMigrator{
storDB: &stor,
mgoDB: stor.(*engine.MongoStorage),
qryIter: nil,
}
}
type mongoStorDBMigrator struct {
storDB *engine.StorDB
mgoDB *engine.MongoStorage
qryIter *mgo.Iter
}
func (mgoMig *mongoStorDBMigrator) StorDB() engine.StorDB {
return *mgoMig.storDB
}
//CDR methods
//get
func (v1ms *mongoMigrator) getV1CDR() (v1Cdr *v1Cdrs, err error) {
func (v1ms *mongoStorDBMigrator) getV1CDR() (v1Cdr *v1Cdrs, err error) {
if v1ms.qryIter == nil {
v1ms.qryIter = v1ms.session.DB(v1ms.db).C(engine.ColCDRs).Find(nil).Iter()
v1ms.qryIter = v1ms.mgoDB.DB().C(engine.ColCDRs).Find(nil).Iter()
}
v1ms.qryIter.Next(&v1Cdr)
@@ -40,8 +59,8 @@ func (v1ms *mongoMigrator) getV1CDR() (v1Cdr *v1Cdrs, err error) {
}
//set
func (v1ms *mongoMigrator) setV1CDR(v1Cdr *v1Cdrs) (err error) {
if err = v1ms.session.DB(v1ms.db).C(engine.ColCDRs).Insert(v1Cdr); err != nil {
func (v1ms *mongoStorDBMigrator) setV1CDR(v1Cdr *v1Cdrs) (err error) {
if err = v1ms.mgoDB.DB().C(engine.ColCDRs).Insert(v1Cdr); err != nil {
return err
}
return
@@ -49,9 +68,9 @@ func (v1ms *mongoMigrator) setV1CDR(v1Cdr *v1Cdrs) (err error) {
//SMCost methods
//get
func (v1ms *mongoMigrator) getSMCost() (v2Cost *v2SessionsCost, err error) {
func (v1ms *mongoStorDBMigrator) getV2SMCost() (v2Cost *v2SessionsCost, err error) {
if v1ms.qryIter == nil {
v1ms.qryIter = v1ms.session.DB(v1ms.db).C(utils.SessionsCostsTBL).Find(nil).Iter()
v1ms.qryIter = v1ms.mgoDB.DB().C(utils.SessionsCostsTBL).Find(nil).Iter()
}
v1ms.qryIter.Next(&v2Cost)
@@ -64,16 +83,16 @@ func (v1ms *mongoMigrator) getSMCost() (v2Cost *v2SessionsCost, err error) {
}
//set
func (v1ms *mongoMigrator) setSMCost(v2Cost *v2SessionsCost) (err error) {
if err = v1ms.session.DB(v1ms.db).C(utils.SessionsCostsTBL).Insert(v2Cost); err != nil {
func (v1ms *mongoStorDBMigrator) setV2SMCost(v2Cost *v2SessionsCost) (err error) {
if err = v1ms.mgoDB.DB().C(utils.SessionsCostsTBL).Insert(v2Cost); err != nil {
return err
}
return
}
//remove
func (v1ms *mongoMigrator) remSMCost(v2Cost *v2SessionsCost) (err error) {
if err = v1ms.session.DB(v1ms.db).C(utils.SessionsCostsTBL).Remove(nil); err != nil {
func (v1ms *mongoStorDBMigrator) remV2SMCost(v2Cost *v2SessionsCost) (err error) {
if err = v1ms.mgoDB.DB().C(utils.SessionsCostsTBL).Remove(nil); err != nil {
return err
}
return

View File

@@ -31,8 +31,14 @@ type redisMigrator struct {
}
func newRedisMigrator(dm *engine.DataManager) (rM *redisMigrator) {
rM.dm = dm
rM.rds = dm.DataDB().(*engine.RedisStorage)
return &redisMigrator{
dm: dm,
rds: dm.DataDB().(*engine.RedisStorage),
}
}
func (rdsMig *redisMigrator) DataManager() *engine.DataManager {
return rdsMig.dm
}
//Account methods
@@ -40,7 +46,7 @@ func newRedisMigrator(dm *engine.DataManager) (rM *redisMigrator) {
//get
func (v1rs *redisMigrator) getv1Account() (v1Acnt *v1Account, err error) {
if v1rs.qryIdx == nil {
v1rs.dataKeys, err = v1rs.getKeysForPrefix(v1AccountDBPrefix)
v1rs.dataKeys, err = v1rs.rds.GetKeysForPrefix(v1AccountDBPrefix)
if err != nil {
return
} else if len(v1rs.dataKeys) == 0 {
@@ -49,12 +55,12 @@ func (v1rs *redisMigrator) getv1Account() (v1Acnt *v1Account, err error) {
v1rs.qryIdx = utils.IntPointer(0)
}
if *v1rs.qryIdx <= len(v1rs.dataKeys)-1 {
strVal, err := v1rs.cmd("GET", v1rs.dataKeys[*v1rs.qryIdx]).Bytes()
strVal, err := v1rs.rds.Cmd("GET", v1rs.dataKeys[*v1rs.qryIdx]).Bytes()
if err != nil {
return nil, err
}
v1Acnt = &v1Account{Id: v1rs.dataKeys[*v1rs.qryIdx]}
if err := v1rs.ms.Unmarshal(strVal, v1Acnt); err != nil {
if err := v1rs.rds.Marshaler().Unmarshal(strVal, v1Acnt); err != nil {
return nil, err
}
*v1rs.qryIdx = *v1rs.qryIdx + 1
@@ -68,11 +74,11 @@ func (v1rs *redisMigrator) getv1Account() (v1Acnt *v1Account, err error) {
//set
func (v1rs *redisMigrator) setV1Account(x *v1Account) (err error) {
key := v1AccountDBPrefix + x.Id
bit, err := v1rs.ms.Marshal(x)
bit, err := v1rs.rds.Marshaler().Marshal(x)
if err != nil {
return err
}
if err = v1rs.cmd("SET", key, bit).Err; err != nil {
if err = v1rs.rds.Cmd("SET", key, bit).Err; err != nil {
return err
}
return
@@ -82,7 +88,7 @@ func (v1rs *redisMigrator) setV1Account(x *v1Account) (err error) {
//get
func (v1rs *redisMigrator) getv2Account() (v2Acnt *v2Account, err error) {
if v1rs.qryIdx == nil {
v1rs.dataKeys, err = v1rs.getKeysForPrefix(utils.ACCOUNT_PREFIX)
v1rs.dataKeys, err = v1rs.rds.GetKeysForPrefix(utils.ACCOUNT_PREFIX)
if err != nil {
return
} else if len(v1rs.dataKeys) == 0 {
@@ -91,12 +97,12 @@ func (v1rs *redisMigrator) getv2Account() (v2Acnt *v2Account, err error) {
v1rs.qryIdx = utils.IntPointer(0)
}
if *v1rs.qryIdx <= len(v1rs.dataKeys)-1 {
strVal, err := v1rs.cmd("GET", v1rs.dataKeys[*v1rs.qryIdx]).Bytes()
strVal, err := v1rs.rds.Cmd("GET", v1rs.dataKeys[*v1rs.qryIdx]).Bytes()
if err != nil {
return nil, err
}
v2Acnt = &v2Account{ID: v1rs.dataKeys[*v1rs.qryIdx]}
if err := v1rs.ms.Unmarshal(strVal, v2Acnt); err != nil {
if err := v1rs.rds.Marshaler().Unmarshal(strVal, v2Acnt); err != nil {
return nil, err
}
*v1rs.qryIdx = *v1rs.qryIdx + 1
@@ -110,11 +116,11 @@ func (v1rs *redisMigrator) getv2Account() (v2Acnt *v2Account, err error) {
//set
func (v1rs *redisMigrator) setV2Account(x *v2Account) (err error) {
key := utils.ACCOUNT_PREFIX + x.ID
bit, err := v1rs.ms.Marshal(x)
bit, err := v1rs.rds.Marshaler().Marshal(x)
if err != nil {
return err
}
if err = v1rs.cmd("SET", key, bit).Err; err != nil {
if err = v1rs.rds.Cmd("SET", key, bit).Err; err != nil {
return err
}
return
@@ -124,7 +130,7 @@ func (v1rs *redisMigrator) setV2Account(x *v2Account) (err error) {
//get
func (v1rs *redisMigrator) getV1ActionPlans() (v1aps *v1ActionPlans, err error) {
if v1rs.qryIdx == nil {
v1rs.dataKeys, err = v1rs.getKeysForPrefix(utils.ACTION_PLAN_PREFIX)
v1rs.dataKeys, err = v1rs.rds.GetKeysForPrefix(utils.ACTION_PLAN_PREFIX)
if err != nil {
return
} else if len(v1rs.dataKeys) == 0 {
@@ -133,11 +139,11 @@ func (v1rs *redisMigrator) getV1ActionPlans() (v1aps *v1ActionPlans, err error)
v1rs.qryIdx = utils.IntPointer(0)
}
if *v1rs.qryIdx <= len(v1rs.dataKeys)-1 {
strVal, err := v1rs.cmd("GET", v1rs.dataKeys[*v1rs.qryIdx]).Bytes()
strVal, err := v1rs.rds.Cmd("GET", v1rs.dataKeys[*v1rs.qryIdx]).Bytes()
if err != nil {
return nil, err
}
if err := v1rs.ms.Unmarshal(strVal, &v1aps); err != nil {
if err := v1rs.rds.Marshaler().Unmarshal(strVal, &v1aps); err != nil {
return nil, err
}
*v1rs.qryIdx = *v1rs.qryIdx + 1
@@ -151,11 +157,11 @@ func (v1rs *redisMigrator) getV1ActionPlans() (v1aps *v1ActionPlans, err error)
//set
func (v1rs *redisMigrator) setV1ActionPlans(x *v1ActionPlans) (err error) {
key := utils.ACTION_PLAN_PREFIX + (*x)[0].Id
bit, err := v1rs.ms.Marshal(x)
bit, err := v1rs.rds.Marshaler().Marshal(x)
if err != nil {
return err
}
if err = v1rs.cmd("SET", key, bit).Err; err != nil {
if err = v1rs.rds.Cmd("SET", key, bit).Err; err != nil {
return err
}
return
@@ -165,7 +171,7 @@ func (v1rs *redisMigrator) setV1ActionPlans(x *v1ActionPlans) (err error) {
//get
func (v1rs *redisMigrator) getV1Actions() (v1acs *v1Actions, err error) {
if v1rs.qryIdx == nil {
v1rs.dataKeys, err = v1rs.getKeysForPrefix(utils.ACTION_PREFIX)
v1rs.dataKeys, err = v1rs.rds.GetKeysForPrefix(utils.ACTION_PREFIX)
if err != nil {
return
} else if len(v1rs.dataKeys) == 0 {
@@ -174,11 +180,11 @@ func (v1rs *redisMigrator) getV1Actions() (v1acs *v1Actions, err error) {
v1rs.qryIdx = utils.IntPointer(0)
}
if *v1rs.qryIdx <= len(v1rs.dataKeys)-1 {
strVal, err := v1rs.cmd("GET", v1rs.dataKeys[*v1rs.qryIdx]).Bytes()
strVal, err := v1rs.rds.Cmd("GET", v1rs.dataKeys[*v1rs.qryIdx]).Bytes()
if err != nil {
return nil, err
}
if err := v1rs.ms.Unmarshal(strVal, &v1acs); err != nil {
if err := v1rs.rds.Marshaler().Unmarshal(strVal, &v1acs); err != nil {
return nil, err
}
*v1rs.qryIdx = *v1rs.qryIdx + 1
@@ -192,11 +198,11 @@ func (v1rs *redisMigrator) getV1Actions() (v1acs *v1Actions, err error) {
//set
func (v1rs *redisMigrator) setV1Actions(x *v1Actions) (err error) {
key := utils.ACTION_PREFIX + (*x)[0].Id
bit, err := v1rs.ms.Marshal(x)
bit, err := v1rs.rds.Marshaler().Marshal(x)
if err != nil {
return err
}
if err = v1rs.cmd("SET", key, bit).Err; err != nil {
if err = v1rs.rds.Cmd("SET", key, bit).Err; err != nil {
return err
}
return
@@ -206,7 +212,7 @@ func (v1rs *redisMigrator) setV1Actions(x *v1Actions) (err error) {
//get
func (v1rs *redisMigrator) getV1ActionTriggers() (v1acts *v1ActionTriggers, err error) {
if v1rs.qryIdx == nil {
v1rs.dataKeys, err = v1rs.getKeysForPrefix(utils.ACTION_TRIGGER_PREFIX)
v1rs.dataKeys, err = v1rs.rds.GetKeysForPrefix(utils.ACTION_TRIGGER_PREFIX)
if err != nil {
return
} else if len(v1rs.dataKeys) == 0 {
@@ -215,11 +221,11 @@ func (v1rs *redisMigrator) getV1ActionTriggers() (v1acts *v1ActionTriggers, err
v1rs.qryIdx = utils.IntPointer(0)
}
if *v1rs.qryIdx <= len(v1rs.dataKeys)-1 {
strVal, err := v1rs.cmd("GET", v1rs.dataKeys[*v1rs.qryIdx]).Bytes()
strVal, err := v1rs.rds.Cmd("GET", v1rs.dataKeys[*v1rs.qryIdx]).Bytes()
if err != nil {
return nil, err
}
if err := v1rs.ms.Unmarshal(strVal, &v1acts); err != nil {
if err := v1rs.rds.Marshaler().Unmarshal(strVal, &v1acts); err != nil {
return nil, err
}
*v1rs.qryIdx = *v1rs.qryIdx + 1
@@ -233,11 +239,11 @@ func (v1rs *redisMigrator) getV1ActionTriggers() (v1acts *v1ActionTriggers, err
//set
func (v1rs *redisMigrator) setV1ActionTriggers(x *v1ActionTriggers) (err error) {
key := utils.ACTION_TRIGGER_PREFIX + (*x)[0].Id
bit, err := v1rs.ms.Marshal(x)
bit, err := v1rs.rds.Marshaler().Marshal(x)
if err != nil {
return err
}
if err = v1rs.cmd("SET", key, bit).Err; err != nil {
if err = v1rs.rds.Cmd("SET", key, bit).Err; err != nil {
return err
}
return
@@ -247,7 +253,7 @@ func (v1rs *redisMigrator) setV1ActionTriggers(x *v1ActionTriggers) (err error)
//get
func (v1rs *redisMigrator) getV1SharedGroup() (v1sg *v1SharedGroup, err error) {
if v1rs.qryIdx == nil {
v1rs.dataKeys, err = v1rs.getKeysForPrefix(utils.SHARED_GROUP_PREFIX)
v1rs.dataKeys, err = v1rs.rds.GetKeysForPrefix(utils.SHARED_GROUP_PREFIX)
if err != nil {
return
} else if len(v1rs.dataKeys) == 0 {
@@ -256,11 +262,11 @@ func (v1rs *redisMigrator) getV1SharedGroup() (v1sg *v1SharedGroup, err error) {
v1rs.qryIdx = utils.IntPointer(0)
}
if *v1rs.qryIdx <= len(v1rs.dataKeys)-1 {
strVal, err := v1rs.cmd("GET", v1rs.dataKeys[*v1rs.qryIdx]).Bytes()
strVal, err := v1rs.rds.Cmd("GET", v1rs.dataKeys[*v1rs.qryIdx]).Bytes()
if err != nil {
return nil, err
}
if err := v1rs.ms.Unmarshal(strVal, &v1sg); err != nil {
if err := v1rs.rds.Marshaler().Unmarshal(strVal, &v1sg); err != nil {
return nil, err
}
*v1rs.qryIdx = *v1rs.qryIdx + 1
@@ -274,11 +280,11 @@ func (v1rs *redisMigrator) getV1SharedGroup() (v1sg *v1SharedGroup, err error) {
//set
func (v1rs *redisMigrator) setV1SharedGroup(x *v1SharedGroup) (err error) {
key := utils.SHARED_GROUP_PREFIX + x.Id
bit, err := v1rs.ms.Marshal(x)
bit, err := v1rs.rds.Marshaler().Marshal(x)
if err != nil {
return err
}
if err = v1rs.cmd("SET", key, bit).Err; err != nil {
if err = v1rs.rds.Cmd("SET", key, bit).Err; err != nil {
return err
}
return
@@ -288,7 +294,7 @@ func (v1rs *redisMigrator) setV1SharedGroup(x *v1SharedGroup) (err error) {
//get
func (v1rs *redisMigrator) getV1Stats() (v1st *v1Stat, err error) {
if v1rs.qryIdx == nil {
v1rs.dataKeys, err = v1rs.getKeysForPrefix(utils.CDR_STATS_PREFIX)
v1rs.dataKeys, err = v1rs.rds.GetKeysForPrefix(utils.CDR_STATS_PREFIX)
if err != nil {
return
} else if len(v1rs.dataKeys) == 0 {
@@ -297,11 +303,11 @@ func (v1rs *redisMigrator) getV1Stats() (v1st *v1Stat, err error) {
v1rs.qryIdx = utils.IntPointer(0)
}
if *v1rs.qryIdx <= len(v1rs.dataKeys)-1 {
strVal, err := v1rs.cmd("GET", v1rs.dataKeys[*v1rs.qryIdx]).Bytes()
strVal, err := v1rs.rds.Cmd("GET", v1rs.dataKeys[*v1rs.qryIdx]).Bytes()
if err != nil {
return nil, err
}
if err := v1rs.ms.Unmarshal(strVal, &v1st); err != nil {
if err := v1rs.rds.Marshaler().Unmarshal(strVal, &v1st); err != nil {
return nil, err
}
*v1rs.qryIdx = *v1rs.qryIdx + 1
@@ -315,11 +321,11 @@ func (v1rs *redisMigrator) getV1Stats() (v1st *v1Stat, err error) {
//set
func (v1rs *redisMigrator) setV1Stats(x *v1Stat) (err error) {
key := utils.CDR_STATS_PREFIX + x.Id
bit, err := v1rs.ms.Marshal(x)
bit, err := v1rs.rds.Marshaler().Marshal(x)
if err != nil {
return err
}
if err = v1rs.cmd("SET", key, bit).Err; err != nil {
if err = v1rs.rds.Cmd("SET", key, bit).Err; err != nil {
return err
}
return
@@ -329,7 +335,7 @@ func (v1rs *redisMigrator) setV1Stats(x *v1Stat) (err error) {
//get
func (v1rs *redisMigrator) getV2ActionTrigger() (v2at *v2ActionTrigger, err error) {
if v1rs.qryIdx == nil {
v1rs.dataKeys, err = v1rs.getKeysForPrefix(utils.ACTION_TRIGGER_PREFIX)
v1rs.dataKeys, err = v1rs.rds.GetKeysForPrefix(utils.ACTION_TRIGGER_PREFIX)
if err != nil {
return
} else if len(v1rs.dataKeys) == 0 {
@@ -338,11 +344,11 @@ func (v1rs *redisMigrator) getV2ActionTrigger() (v2at *v2ActionTrigger, err erro
v1rs.qryIdx = utils.IntPointer(0)
}
if *v1rs.qryIdx <= len(v1rs.dataKeys)-1 {
strVal, err := v1rs.cmd("GET", v1rs.dataKeys[*v1rs.qryIdx]).Bytes()
strVal, err := v1rs.rds.Cmd("GET", v1rs.dataKeys[*v1rs.qryIdx]).Bytes()
if err != nil {
return nil, err
}
if err := v1rs.ms.Unmarshal(strVal, &v2at); err != nil {
if err := v1rs.rds.Marshaler().Unmarshal(strVal, &v2at); err != nil {
return nil, err
}
*v1rs.qryIdx = *v1rs.qryIdx + 1
@@ -356,11 +362,11 @@ func (v1rs *redisMigrator) getV2ActionTrigger() (v2at *v2ActionTrigger, err erro
//set
func (v1rs *redisMigrator) setV2ActionTrigger(x *v2ActionTrigger) (err error) {
key := utils.ACTION_TRIGGER_PREFIX + x.ID
bit, err := v1rs.ms.Marshal(x)
bit, err := v1rs.rds.Marshaler().Marshal(x)
if err != nil {
return err
}
if err = v1rs.cmd("SET", key, bit).Err; err != nil {
if err = v1rs.rds.Cmd("SET", key, bit).Err; err != nil {
return err
}
return
@@ -371,7 +377,7 @@ func (v1rs *redisMigrator) setV2ActionTrigger(x *v2ActionTrigger) (err error) {
func (v1rs *redisMigrator) getV1AttributeProfile() (v1attrPrf *v1AttributeProfile, err error) {
var v1attr *v1AttributeProfile
if v1rs.qryIdx == nil {
v1rs.dataKeys, err = v1rs.getKeysForPrefix(utils.AttributeProfilePrefix)
v1rs.dataKeys, err = v1rs.rds.GetKeysForPrefix(utils.AttributeProfilePrefix)
if err != nil {
return
} else if len(v1rs.dataKeys) == 0 {
@@ -380,11 +386,11 @@ func (v1rs *redisMigrator) getV1AttributeProfile() (v1attrPrf *v1AttributeProfil
v1rs.qryIdx = utils.IntPointer(0)
}
if *v1rs.qryIdx <= len(v1rs.dataKeys)-1 {
strVal, err := v1rs.cmd("GET", v1rs.dataKeys[*v1rs.qryIdx]).Bytes()
strVal, err := v1rs.rds.Cmd("GET", v1rs.dataKeys[*v1rs.qryIdx]).Bytes()
if err != nil {
return nil, err
}
if err := v1rs.ms.Unmarshal(strVal, &v1attr); err != nil {
if err := v1rs.rds.Marshaler().Unmarshal(strVal, &v1attr); err != nil {
return nil, err
}
*v1rs.qryIdx = *v1rs.qryIdx + 1
@@ -398,11 +404,11 @@ func (v1rs *redisMigrator) getV1AttributeProfile() (v1attrPrf *v1AttributeProfil
//set
func (v1rs *redisMigrator) setV1AttributeProfile(x *v1AttributeProfile) (err error) {
key := utils.AttributeProfilePrefix + utils.ConcatenatedKey(x.Tenant, x.ID)
bit, err := v1rs.ms.Marshal(x)
bit, err := v1rs.rds.Marshaler().Marshal(x)
if err != nil {
return err
}
if err = v1rs.cmd("SET", key, bit).Err; err != nil {
if err = v1rs.rds.Cmd("SET", key, bit).Err; err != nil {
return err
}
return

View File

@@ -27,33 +27,44 @@ import (
_ "github.com/go-sql-driver/mysql"
)
type migratorSQL struct {
storDB *engine.StorDB
sqlDB *sql.DB
rowIter *sql.Rows
func newMigratorSQL(stor engine.StorDB) (sqlMig *migratorSQL) {
return &migratorSQL{
storDB: &stor,
sqlStorage: stor.(*engine.SQLStorage),
}
}
func (sqlStorage *migratorSQL) getV1CDR() (v1Cdr *v1Cdrs, err error) {
if sqlStorage.rowIter == nil {
sqlStorage.rowIter, err = sqlStorage.Db.Query("SELECT * FROM cdrs")
type migratorSQL struct {
storDB *engine.StorDB
sqlStorage *engine.SQLStorage
rowIter *sql.Rows
}
func (sqlMig *migratorSQL) StorDB() engine.StorDB {
return *sqlMig.storDB
}
func (mgSQL *migratorSQL) getV1CDR() (v1Cdr *v1Cdrs, err error) {
if mgSQL.rowIter == nil {
mgSQL.rowIter, err = mgSQL.sqlStorage.Db.Query("SELECT * FROM cdrs")
if err != nil {
return nil, err
}
}
cdrSql := new(engine.CDRsql)
sqlStorage.rowIter.Scan(&cdrSql)
mgSQL.rowIter.Scan(&cdrSql)
v1Cdr, err = NewV1CDRFromCDRSql(cdrSql)
if sqlStorage.rowIter.Next() {
if mgSQL.rowIter.Next() {
v1Cdr = nil
sqlStorage.rowIter = nil
mgSQL.rowIter = nil
return nil, utils.ErrNoMoreData
}
return v1Cdr, nil
}
func (sqlStorage *migratorSQL) setV1CDR(v1Cdr *v1Cdrs) (err error) {
tx := sqlStorage.db.Begin()
func (mgSQL *migratorSQL) setV1CDR(v1Cdr *v1Cdrs) (err error) {
tx := mgSQL.sqlStorage.ExportGormDB().Begin()
cdrSql := v1Cdr.AsCDRsql()
cdrSql.CreatedAt = time.Now()
saved := tx.Save(cdrSql)
@@ -64,27 +75,27 @@ func (sqlStorage *migratorSQL) setV1CDR(v1Cdr *v1Cdrs) (err error) {
return nil
}
func (sqlStorage *migratorSQL) getSMCost() (v2Cost *v2SessionsCost, err error) {
if sqlStorage.rowIter == nil {
sqlStorage.rowIter, err = sqlStorage.Db.Query("SELECT * FROM sessions_costs")
func (mgSQL *migratorSQL) getV2SMCost() (v2Cost *v2SessionsCost, err error) {
if mgSQL.rowIter == nil {
mgSQL.rowIter, err = mgSQL.sqlStorage.Db.Query("SELECT * FROM sessions_costs")
if err != nil {
return nil, err
}
}
scSql := new(engine.SessionsCostsSQL)
sqlStorage.rowIter.Scan(&scSql)
mgSQL.rowIter.Scan(&scSql)
v2Cost, err = NewV2SessionsCostFromSessionsCostSql(scSql)
if sqlStorage.rowIter.Next() {
if mgSQL.rowIter.Next() {
v2Cost = nil
sqlStorage.rowIter = nil
mgSQL.rowIter = nil
return nil, utils.ErrNoMoreData
}
return v2Cost, nil
}
func (sqlStorage *migratorSQL) setSMCost(v2Cost *v2SessionsCost) (err error) {
tx := sqlStorage.db.Begin()
func (mgSQL *migratorSQL) setV2SMCost(v2Cost *v2SessionsCost) (err error) {
tx := mgSQL.sqlStorage.ExportGormDB().Begin()
smSql := v2Cost.AsSessionsCostSql()
smSql.CreatedAt = time.Now()
saved := tx.Save(smSql)
@@ -95,8 +106,8 @@ func (sqlStorage *migratorSQL) setSMCost(v2Cost *v2SessionsCost) (err error) {
return
}
func (sqlStorage *migratorSQL) remSMCost(v2Cost *v2SessionsCost) (err error) {
tx := sqlStorage.db.Begin()
func (mgSQL *migratorSQL) remV2SMCost(v2Cost *v2SessionsCost) (err error) {
tx := mgSQL.sqlStorage.ExportGormDB().Begin()
var rmParam *engine.SessionsCostsSQL
if v2Cost != nil {
rmParam = &engine.SessionsCostsSQL{Cgrid: v2Cost.CGRID,