Updated CDRs migration process

This commit is contained in:
adragusin
2020-06-23 18:00:06 +03:00
committed by Dan Christian Bogos
parent 3d83b58945
commit 80784ce4b4
7 changed files with 92 additions and 36 deletions

View File

@@ -189,8 +189,10 @@ func (m *Migrator) migrateAlias2Attributes() (err error) {
if m.dryRun {
return
}
if err = m.removeAlias2Attributes(); err != nil {
return
if !m.sameDataDB {
if err = m.removeAlias2Attributes(); err != nil {
return
}
}
// All done, update version wtih current one
vrs := engine.Versions{Alias: 0} //engine.CurrentDataDBVersions()[utils.Alias]}

View File

@@ -210,9 +210,11 @@ func testAlsITMigrateAndMove(t *testing.T) {
if !reflect.DeepEqual(*attrProf, *result) {
t.Errorf("Expecting: %+v, received: %+v", utils.ToJSON(attrProf), utils.ToJSON(result))
}
//check if old account was deleted
if _, err = alsMigrator.dmIN.getV1Alias(); err != utils.ErrNoMoreData {
t.Error("Error should be not found : ", err)
//check if old account was deleted (only if dmIN != dmOut)
if !alsMigrator.sameDataDB {
if _, err = alsMigrator.dmIN.getV1Alias(); err != utils.ErrNoMoreData {
t.Error("Error should be not found : ", err)
}
}
expAlsIdx := map[string]utils.StringSet{

View File

@@ -46,7 +46,7 @@ func (m *Migrator) migrateCurrentCDRs() (err error) {
func (m *Migrator) migrateCDRs() (err error) {
var vrs engine.Versions
current := engine.CurrentStorDBVersions()
vrs, err = m.storDBIn.StorDB().GetVersions("")
vrs, err = m.storDBIn.StorDB().GetVersions(utils.EmptyString)
if err != nil {
return utils.NewCGRError(utils.Migrator,
utils.ServerErrorCaps,
@@ -58,49 +58,73 @@ func (m *Migrator) migrateCDRs() (err error) {
utils.UndefinedVersion,
"version number is not defined for Actions")
}
switch vrs[utils.CDRs] {
case 1:
if err = m.migrateV1CDRs(); err != nil {
return err
}
case current[utils.CDRs]:
if err = m.migrateCurrentCDRs(); err != nil {
return err
}
}
return m.ensureIndexesStorDB(engine.ColCDRs)
}
func (m *Migrator) migrateV1CDRs() (err error) {
var v1CDR *v1Cdrs
migrated := true
var v2 *engine.CDR
for {
v1CDR, err = m.storDBIn.getV1CDR()
if err != nil && err != utils.ErrNoMoreData {
return err
version := vrs[utils.CDRs]
for {
switch vrs[utils.CDRs] {
case current[utils.CDRs]:
migrated = false
if err = m.migrateCurrentCDRs(); err != nil {
return err
}
case 1:
if v2, err = m.migrateV1CDRs(); err != nil && err != utils.ErrNoMoreData {
return err
}
version = 2
}
if version == current[utils.CDRs] || err == utils.ErrNoMoreData {
break
}
}
if err == utils.ErrNoMoreData {
if err == utils.ErrNoMoreData || !migrated {
break
}
if v1CDR == nil || m.dryRun {
continue
if !m.dryRun && migrated {
//set action plan
if err = m.storDBOut.StorDB().SetCDR(v2, true); err != nil {
return err
}
}
cdr := v1CDR.V1toV2Cdr()
if err = m.storDBOut.StorDB().SetCDR(cdr, true); err != nil {
return err
}
m.stats[utils.CDRs] += 1
}
if m.dryRun {
return
m.stats[utils.CDRs]++
}
// All done, update version wtih current one
vrs := engine.Versions{utils.CDRs: engine.CurrentStorDBVersions()[utils.CDRs]}
vrs = engine.Versions{utils.CDRs: engine.CurrentStorDBVersions()[utils.CDRs]}
if err = m.storDBOut.StorDB().SetVersions(vrs, false); err != nil {
return utils.NewCGRError(utils.Migrator,
utils.ServerErrorCaps,
err.Error(),
fmt.Sprintf("error: <%s> when updating CDRs version into StorDB", err.Error()))
}
return m.ensureIndexesStorDB(engine.ColCDRs)
}
func (m *Migrator) removeV1CDRs() (err error) {
var v1CDR *v1Cdrs
if v1CDR, err = m.storDBIn.getV1CDR(); err != nil {
return err
}
if v1CDR == nil {
return
}
if err = m.storDBIn.remV1CDRs(v1CDR); err != nil {
return
}
return
}
func (m *Migrator) migrateV1CDRs() (cdr *engine.CDR, err error) {
var v1CDR *v1Cdrs
if v1CDR, err = m.storDBIn.getV1CDR(); err != nil {
return nil, err
}
if v1CDR == nil {
return
}
cdr = v1CDR.V1toV2Cdr()
return
}

View File

@@ -25,6 +25,7 @@ import (
type MigratorStorDB interface {
getV1CDR() (v1Cdr *v1Cdrs, err error)
setV1CDR(v1Cdr *v1Cdrs) (err error)
remV1CDRs(v1Cdr *v1Cdrs) (err error)
createV1SMCosts() (err error)
renameV1SMCosts() (err error)
getV2SMCost() (v2Cost *v2SessionsCost, err error)

View File

@@ -54,6 +54,11 @@ func (iDBMig *internalStorDBMigrator) setV1CDR(v1Cdr *v1Cdrs) (err error) {
return utils.ErrNotImplemented
}
//rem
func (iDBMig *internalStorDBMigrator) remV1CDRs(v1Cdr *v1Cdrs) (err error) {
return utils.ErrNotImplemented
}
//SMCost methods
//rename
func (iDBMig *internalStorDBMigrator) renameV1SMCosts() (err error) {

View File

@@ -74,6 +74,12 @@ func (v1ms *mongoStorDBMigrator) setV1CDR(v1Cdr *v1Cdrs) (err error) {
return
}
//rem
func (v1ms *mongoStorDBMigrator) remV1CDRs(v1Cdr *v1Cdrs) (err error) {
_, err = v1ms.mgoDB.DB().Collection(engine.ColCDRs).DeleteOne(v1ms.mgoDB.GetContext(), v1Cdr)
return
}
//SMCost methods
//rename
func (v1ms *mongoStorDBMigrator) renameV1SMCosts() (err error) {

View File

@@ -80,6 +80,22 @@ func (mgSQL *migratorSQL) setV1CDR(v1Cdr *v1Cdrs) (err error) {
return nil
}
//rem
func (mgSQL *migratorSQL) remV1CDRs(v1Cdr *v1Cdrs) (err error) {
tx := mgSQL.sqlStorage.ExportGormDB().Begin()
var rmParam *v1Cdrs
if v1Cdr != nil {
rmParam = &v1Cdrs{CGRID: v1Cdr.CGRID,
RunID: v1Cdr.RunID}
}
if err := tx.Where(rmParam).Delete(v1Cdrs{}).Error; err != nil {
tx.Rollback()
return err
}
tx.Commit()
return nil
}
func (mgSQL *migratorSQL) renameV1SMCosts() (err error) {
qry := "RENAME TABLE sm_costs TO session_costs;"
if mgSQL.StorDB().GetStorageType() == utils.POSTGRES {