diff --git a/migrator/alias.go b/migrator/alias.go index 53d254acd..80201c3a4 100644 --- a/migrator/alias.go +++ b/migrator/alias.go @@ -189,8 +189,10 @@ func (m *Migrator) migrateAlias2Attributes() (err error) { if m.dryRun { return } - if err = m.removeAlias2Attributes(); err != nil { - return + if !m.sameDataDB { + if err = m.removeAlias2Attributes(); err != nil { + return + } } // All done, update version wtih current one vrs := engine.Versions{Alias: 0} //engine.CurrentDataDBVersions()[utils.Alias]} diff --git a/migrator/alias_it_test.go b/migrator/alias_it_test.go index 12fb13424..248ba5848 100644 --- a/migrator/alias_it_test.go +++ b/migrator/alias_it_test.go @@ -210,9 +210,11 @@ func testAlsITMigrateAndMove(t *testing.T) { if !reflect.DeepEqual(*attrProf, *result) { t.Errorf("Expecting: %+v, received: %+v", utils.ToJSON(attrProf), utils.ToJSON(result)) } - //check if old account was deleted - if _, err = alsMigrator.dmIN.getV1Alias(); err != utils.ErrNoMoreData { - t.Error("Error should be not found : ", err) + //check if old account was deleted (only if dmIN != dmOut) + if !alsMigrator.sameDataDB { + if _, err = alsMigrator.dmIN.getV1Alias(); err != utils.ErrNoMoreData { + t.Error("Error should be not found : ", err) + } } expAlsIdx := map[string]utils.StringSet{ diff --git a/migrator/cdrs.go b/migrator/cdrs.go index 54bd38fde..e865957a4 100755 --- a/migrator/cdrs.go +++ b/migrator/cdrs.go @@ -46,7 +46,7 @@ func (m *Migrator) migrateCurrentCDRs() (err error) { func (m *Migrator) migrateCDRs() (err error) { var vrs engine.Versions current := engine.CurrentStorDBVersions() - vrs, err = m.storDBIn.StorDB().GetVersions("") + vrs, err = m.storDBIn.StorDB().GetVersions(utils.EmptyString) if err != nil { return utils.NewCGRError(utils.Migrator, utils.ServerErrorCaps, @@ -58,49 +58,73 @@ func (m *Migrator) migrateCDRs() (err error) { utils.UndefinedVersion, "version number is not defined for Actions") } - switch vrs[utils.CDRs] { - case 1: - if err = m.migrateV1CDRs(); err != nil { - return err - } - case current[utils.CDRs]: - if err = m.migrateCurrentCDRs(); err != nil { - return err - } - } - return m.ensureIndexesStorDB(engine.ColCDRs) -} - -func (m *Migrator) migrateV1CDRs() (err error) { - var v1CDR *v1Cdrs + migrated := true + var v2 *engine.CDR for { - v1CDR, err = m.storDBIn.getV1CDR() - if err != nil && err != utils.ErrNoMoreData { - return err + version := vrs[utils.CDRs] + for { + switch vrs[utils.CDRs] { + case current[utils.CDRs]: + migrated = false + if err = m.migrateCurrentCDRs(); err != nil { + return err + } + case 1: + if v2, err = m.migrateV1CDRs(); err != nil && err != utils.ErrNoMoreData { + return err + } + version = 2 + } + if version == current[utils.CDRs] || err == utils.ErrNoMoreData { + break + } } - if err == utils.ErrNoMoreData { + if err == utils.ErrNoMoreData || !migrated { break } - if v1CDR == nil || m.dryRun { - continue + + if !m.dryRun && migrated { + //set action plan + if err = m.storDBOut.StorDB().SetCDR(v2, true); err != nil { + return err + } } - cdr := v1CDR.V1toV2Cdr() - if err = m.storDBOut.StorDB().SetCDR(cdr, true); err != nil { - return err - } - m.stats[utils.CDRs] += 1 - } - if m.dryRun { - return + m.stats[utils.CDRs]++ } // All done, update version wtih current one - vrs := engine.Versions{utils.CDRs: engine.CurrentStorDBVersions()[utils.CDRs]} + vrs = engine.Versions{utils.CDRs: engine.CurrentStorDBVersions()[utils.CDRs]} if err = m.storDBOut.StorDB().SetVersions(vrs, false); err != nil { return utils.NewCGRError(utils.Migrator, utils.ServerErrorCaps, err.Error(), fmt.Sprintf("error: <%s> when updating CDRs version into StorDB", err.Error())) } + return m.ensureIndexesStorDB(engine.ColCDRs) +} + +func (m *Migrator) removeV1CDRs() (err error) { + var v1CDR *v1Cdrs + if v1CDR, err = m.storDBIn.getV1CDR(); err != nil { + return err + } + if v1CDR == nil { + return + } + if err = m.storDBIn.remV1CDRs(v1CDR); err != nil { + return + } + return +} + +func (m *Migrator) migrateV1CDRs() (cdr *engine.CDR, err error) { + var v1CDR *v1Cdrs + if v1CDR, err = m.storDBIn.getV1CDR(); err != nil { + return nil, err + } + if v1CDR == nil { + return + } + cdr = v1CDR.V1toV2Cdr() return } diff --git a/migrator/migrator_stordb.go b/migrator/migrator_stordb.go index 9912d2f90..a00544e78 100755 --- a/migrator/migrator_stordb.go +++ b/migrator/migrator_stordb.go @@ -25,6 +25,7 @@ import ( type MigratorStorDB interface { getV1CDR() (v1Cdr *v1Cdrs, err error) setV1CDR(v1Cdr *v1Cdrs) (err error) + remV1CDRs(v1Cdr *v1Cdrs) (err error) createV1SMCosts() (err error) renameV1SMCosts() (err error) getV2SMCost() (v2Cost *v2SessionsCost, err error) diff --git a/migrator/storage_map_stordb.go b/migrator/storage_map_stordb.go index 9dcbea9d3..55ab681be 100755 --- a/migrator/storage_map_stordb.go +++ b/migrator/storage_map_stordb.go @@ -54,6 +54,11 @@ func (iDBMig *internalStorDBMigrator) setV1CDR(v1Cdr *v1Cdrs) (err error) { return utils.ErrNotImplemented } +//rem +func (iDBMig *internalStorDBMigrator) remV1CDRs(v1Cdr *v1Cdrs) (err error) { + return utils.ErrNotImplemented +} + //SMCost methods //rename func (iDBMig *internalStorDBMigrator) renameV1SMCosts() (err error) { diff --git a/migrator/storage_mongo_stordb.go b/migrator/storage_mongo_stordb.go index 6421ecf9f..532d4fc2c 100644 --- a/migrator/storage_mongo_stordb.go +++ b/migrator/storage_mongo_stordb.go @@ -74,6 +74,12 @@ func (v1ms *mongoStorDBMigrator) setV1CDR(v1Cdr *v1Cdrs) (err error) { return } +//rem +func (v1ms *mongoStorDBMigrator) remV1CDRs(v1Cdr *v1Cdrs) (err error) { + _, err = v1ms.mgoDB.DB().Collection(engine.ColCDRs).DeleteOne(v1ms.mgoDB.GetContext(), v1Cdr) + return +} + //SMCost methods //rename func (v1ms *mongoStorDBMigrator) renameV1SMCosts() (err error) { diff --git a/migrator/storage_sql.go b/migrator/storage_sql.go index 8c603ce1b..646770b64 100755 --- a/migrator/storage_sql.go +++ b/migrator/storage_sql.go @@ -80,6 +80,22 @@ func (mgSQL *migratorSQL) setV1CDR(v1Cdr *v1Cdrs) (err error) { return nil } +//rem +func (mgSQL *migratorSQL) remV1CDRs(v1Cdr *v1Cdrs) (err error) { + tx := mgSQL.sqlStorage.ExportGormDB().Begin() + var rmParam *v1Cdrs + if v1Cdr != nil { + rmParam = &v1Cdrs{CGRID: v1Cdr.CGRID, + RunID: v1Cdr.RunID} + } + if err := tx.Where(rmParam).Delete(v1Cdrs{}).Error; err != nil { + tx.Rollback() + return err + } + tx.Commit() + return nil +} + func (mgSQL *migratorSQL) renameV1SMCosts() (err error) { qry := "RENAME TABLE sm_costs TO session_costs;" if mgSQL.StorDB().GetStorageType() == utils.POSTGRES {