Updated Stats migration

This commit is contained in:
adragusin
2020-06-03 17:59:56 +03:00
committed by Dan Christian Bogos
parent b8ba03bd57
commit afbd3679ef

View File

@@ -118,49 +118,29 @@ func (m *Migrator) migrateCurrentStats() (err error) {
return m.moveStatQueueProfile()
}
func (m *Migrator) migrateV1CDRSTATS() (err error) {
func (m *Migrator) migrateV1Stats() (filter *engine.Filter, v2Stats *engine.StatQueue, sts *engine.StatQueueProfile, err error) {
var v1Sts *v1Stat
//for {
v1Sts, err = m.dmIN.getV1Stats()
if err != nil {
return err
return nil, nil, nil, err
}
if v1Sts.Id != "" {
if v1Sts.Id != utils.EmptyString {
if len(v1Sts.Triggers) != 0 {
for _, Trigger := range v1Sts.Triggers {
if err := m.SasThreshold(Trigger); err != nil {
return err
return nil, nil, nil, err
}
}
}
filter, sq, sts, err := v1Sts.AsStatQP()
if err != nil {
return err
}
if err := m.dmOut.DataManager().SetFilter(filter); err != nil {
return err
}
if err := m.dmOut.DataManager().SetStatQueue(remakeQueue(sq)); err != nil {
return err
}
if err := m.dmOut.DataManager().SetStatQueueProfile(sts, true); err != nil {
return err
if filter, v2Stats, sts, err = v1Sts.AsStatQP(); err != nil {
return nil, nil, nil, err
}
// if err := m.dmOut.DataManager().SetStatQueue(remakeQueue(v2Stats)); err != nil {
// return nil, err
// }
m.stats[utils.StatS] += 1
}
//}
if m.dryRun {
return
}
// All done, update version wtih current one
vrs := engine.Versions{utils.StatS: engine.CurrentDataDBVersions()[utils.StatS]}
if err = m.dmOut.DataManager().DataDB().SetVersions(vrs, false); err != nil {
return utils.NewCGRError(utils.Migrator,
utils.ServerErrorCaps,
err.Error(),
fmt.Sprintf("error: <%s> when updating Stats version into dataDB", err.Error()))
}
return
}
@@ -179,66 +159,63 @@ func remakeQueue(sq *engine.StatQueue) (out *engine.StatQueue) {
return
}
func (m *Migrator) migrateV2Stats() (err error) {
func (m *Migrator) migrateV2Stats(v2Stats *engine.StatQueue) (v3Stats *engine.StatQueue, err error) {
if v2Stats == nil {
// read from DB
}
var ids []string
//StatQueue
if ids, err = m.dmIN.DataManager().DataDB().GetKeysForPrefix(utils.StatQueuePrefix); err != nil {
return err
return nil, err
}
for _, id := range ids {
tntID := strings.SplitN(strings.TrimPrefix(id, utils.StatQueuePrefix), utils.InInFieldSep, 2)
if len(tntID) < 2 {
return fmt.Errorf("Invalid key <%s> when migrating stat queues", id)
return nil, fmt.Errorf("Invalid key <%s> when migrating stat queues", id)
}
sgs, err := m.dmIN.DataManager().GetStatQueue(tntID[0], tntID[1], false, false, utils.NonTransactional)
if err != nil {
return err
return nil, err
}
if sgs == nil || m.dryRun {
continue
}
if err = m.dmOut.DataManager().SetStatQueue(remakeQueue(sgs)); err != nil {
return err
}
if err = m.dmIN.DataManager().RemoveStatQueue(tntID[0], tntID[1], utils.NonTransactional); err != nil {
return err
}
// if err = m.dmOut.DataManager().SetStatQueue(remakeQueue(sgs)); err != nil {
// return err
// }
// if err = m.dmIN.DataManager().RemoveStatQueue(tntID[0], tntID[1], utils.NonTransactional); err != nil {
// return err
// }
m.stats[utils.StatS] += 1
}
if err = m.moveStatQueueProfile(); err != nil {
return err
}
if m.dryRun {
return
}
// All done, update version wtih current one
vrs := engine.Versions{utils.StatS: engine.CurrentDataDBVersions()[utils.StatS]}
if err = m.dmOut.DataManager().DataDB().SetVersions(vrs, false); err != nil {
return utils.NewCGRError(utils.Migrator,
utils.ServerErrorCaps,
err.Error(),
fmt.Sprintf("error: <%s> when updating Stats version into dataDB", err.Error()))
}
// if err = m.moveStatQueueProfile(); err != nil {
// return err
// }
return
}
func (m *Migrator) migrateStats() (err error) {
var vrs engine.Versions
current := engine.CurrentDataDBVersions()
vrs, err = m.dmOut.DataManager().DataDB().GetVersions("")
vrs, err = m.dmOut.DataManager().DataDB().GetVersions(utils.EmptyString)
if err != nil {
return utils.NewCGRError(utils.Migrator,
utils.ServerErrorCaps,
err.Error(),
utils.ServerErrorCaps, err.Error(),
fmt.Sprintf("error: <%s> when querying oldDataDB for versions", err.Error()))
} else if len(vrs) == 0 {
return utils.NewCGRError(utils.Migrator,
utils.MandatoryIEMissingCaps,
utils.UndefinedVersion,
utils.MandatoryIEMissingCaps, utils.UndefinedVersion,
"version number is not defined for ActionTriggers model")
}
migrated := true
var filter *engine.Filter
var sts *engine.StatQueueProfile
var v2Stats *engine.StatQueue
var v3Stats *engine.StatQueue
for {
version := vrs[utils.StatS]
for {
@@ -253,28 +230,52 @@ func (m *Migrator) migrateStats() (err error) {
}
version = 3
migrated = false
case 1:
if err = m.migrateV1CDRSTATS(); err != nil {
case 1: // migrate from V1 to V2
if filter, v2Stats, sts, err = m.migrateV1Stats(); err != nil && err != utils.ErrNoMoreData {
return err
} else if err == utils.ErrNoMoreData {
break
}
version = 2
case 2:
if err = m.migrateV2Stats(); err != nil {
case 2: // migrate from V2 to V3 (actual)
if v3Stats, err = m.migrateV2Stats(v2Stats); err != nil && err != utils.ErrNoMoreData {
return err
} else if err == utils.ErrNoMoreData {
break
}
version = 3
}
if version == current[utils.StatS] || !migrated {
if version == current[utils.StatS] || err == utils.ErrNoMoreData {
break
}
}
if !m.dryRun && migrated {
// Set into db
if err == utils.ErrNoMoreData || !migrated {
break
}
if !m.dryRun && migrated {
if vrs[utils.StatS] == 1 {
if err := m.dmOut.DataManager().SetFilter(filter); err != nil {
return err
}
if err := m.dmOut.DataManager().SetStatQueueProfile(sts, true); err != nil {
return err
}
}
// Set the fresh-migrated Stats into DB
if err = m.dmOut.DataManager().SetStatQueue(remakeQueue(v3Stats)); err != nil {
return err
}
}
}
// call the remove function here
// All done, update version wtih current one
vrs = engine.Versions{utils.StatS: engine.CurrentDataDBVersions()[utils.StatS]}
if err = m.dmOut.DataManager().DataDB().SetVersions(vrs, false); err != nil {
return utils.NewCGRError(utils.Migrator,
utils.ServerErrorCaps,
err.Error(),
fmt.Sprintf("error: <%s> when updating Stats version into dataDB", err.Error()))
}
return m.ensureIndexesDataDB(engine.ColSqs)
}