diff --git a/migrator/stats.go b/migrator/stats.go index 1cde74102..359a1f5b4 100644 --- a/migrator/stats.go +++ b/migrator/stats.go @@ -118,49 +118,29 @@ func (m *Migrator) migrateCurrentStats() (err error) { return m.moveStatQueueProfile() } -func (m *Migrator) migrateV1CDRSTATS() (err error) { +func (m *Migrator) migrateV1Stats() (filter *engine.Filter, v2Stats *engine.StatQueue, sts *engine.StatQueueProfile, err error) { var v1Sts *v1Stat - //for { v1Sts, err = m.dmIN.getV1Stats() if err != nil { - return err + return nil, nil, nil, err } - if v1Sts.Id != "" { + if v1Sts.Id != utils.EmptyString { if len(v1Sts.Triggers) != 0 { for _, Trigger := range v1Sts.Triggers { if err := m.SasThreshold(Trigger); err != nil { - return err + return nil, nil, nil, err } } } - filter, sq, sts, err := v1Sts.AsStatQP() - if err != nil { - return err - } - if err := m.dmOut.DataManager().SetFilter(filter); err != nil { - return err - } - if err := m.dmOut.DataManager().SetStatQueue(remakeQueue(sq)); err != nil { - return err - } - if err := m.dmOut.DataManager().SetStatQueueProfile(sts, true); err != nil { - return err + if filter, v2Stats, sts, err = v1Sts.AsStatQP(); err != nil { + return nil, nil, nil, err } + // if err := m.dmOut.DataManager().SetStatQueue(remakeQueue(v2Stats)); err != nil { + // return nil, err + // } m.stats[utils.StatS] += 1 } - //} - if m.dryRun { - return - } - // All done, update version wtih current one - vrs := engine.Versions{utils.StatS: engine.CurrentDataDBVersions()[utils.StatS]} - if err = m.dmOut.DataManager().DataDB().SetVersions(vrs, false); err != nil { - return utils.NewCGRError(utils.Migrator, - utils.ServerErrorCaps, - err.Error(), - fmt.Sprintf("error: <%s> when updating Stats version into dataDB", err.Error())) - } return } @@ -179,66 +159,63 @@ func remakeQueue(sq *engine.StatQueue) (out *engine.StatQueue) { return } -func (m *Migrator) migrateV2Stats() (err error) { +func (m *Migrator) migrateV2Stats(v2Stats *engine.StatQueue) (v3Stats *engine.StatQueue, err error) { + if v2Stats == nil { + // read from DB + + } + var ids []string //StatQueue if ids, err = m.dmIN.DataManager().DataDB().GetKeysForPrefix(utils.StatQueuePrefix); err != nil { - return err + return nil, err } for _, id := range ids { tntID := strings.SplitN(strings.TrimPrefix(id, utils.StatQueuePrefix), utils.InInFieldSep, 2) if len(tntID) < 2 { - return fmt.Errorf("Invalid key <%s> when migrating stat queues", id) + return nil, fmt.Errorf("Invalid key <%s> when migrating stat queues", id) } sgs, err := m.dmIN.DataManager().GetStatQueue(tntID[0], tntID[1], false, false, utils.NonTransactional) if err != nil { - return err + return nil, err } if sgs == nil || m.dryRun { continue } - if err = m.dmOut.DataManager().SetStatQueue(remakeQueue(sgs)); err != nil { - return err - } - if err = m.dmIN.DataManager().RemoveStatQueue(tntID[0], tntID[1], utils.NonTransactional); err != nil { - return err - } + // if err = m.dmOut.DataManager().SetStatQueue(remakeQueue(sgs)); err != nil { + // return err + // } + // if err = m.dmIN.DataManager().RemoveStatQueue(tntID[0], tntID[1], utils.NonTransactional); err != nil { + // return err + // } m.stats[utils.StatS] += 1 } - if err = m.moveStatQueueProfile(); err != nil { - return err - } - if m.dryRun { - return - } - // All done, update version wtih current one - vrs := engine.Versions{utils.StatS: engine.CurrentDataDBVersions()[utils.StatS]} - if err = m.dmOut.DataManager().DataDB().SetVersions(vrs, false); err != nil { - return utils.NewCGRError(utils.Migrator, - utils.ServerErrorCaps, - err.Error(), - fmt.Sprintf("error: <%s> when updating Stats version into dataDB", err.Error())) - } + // if err = m.moveStatQueueProfile(); err != nil { + // return err + // } + return } func (m *Migrator) migrateStats() (err error) { var vrs engine.Versions current := engine.CurrentDataDBVersions() - vrs, err = m.dmOut.DataManager().DataDB().GetVersions("") + vrs, err = m.dmOut.DataManager().DataDB().GetVersions(utils.EmptyString) if err != nil { return utils.NewCGRError(utils.Migrator, - utils.ServerErrorCaps, - err.Error(), + utils.ServerErrorCaps, err.Error(), fmt.Sprintf("error: <%s> when querying oldDataDB for versions", err.Error())) } else if len(vrs) == 0 { return utils.NewCGRError(utils.Migrator, - utils.MandatoryIEMissingCaps, - utils.UndefinedVersion, + utils.MandatoryIEMissingCaps, utils.UndefinedVersion, "version number is not defined for ActionTriggers model") } migrated := true + var filter *engine.Filter + var sts *engine.StatQueueProfile + var v2Stats *engine.StatQueue + var v3Stats *engine.StatQueue for { version := vrs[utils.StatS] for { @@ -253,28 +230,52 @@ func (m *Migrator) migrateStats() (err error) { } version = 3 migrated = false - case 1: - if err = m.migrateV1CDRSTATS(); err != nil { + case 1: // migrate from V1 to V2 + if filter, v2Stats, sts, err = m.migrateV1Stats(); err != nil && err != utils.ErrNoMoreData { return err } else if err == utils.ErrNoMoreData { break } version = 2 - case 2: - if err = m.migrateV2Stats(); err != nil { + case 2: // migrate from V2 to V3 (actual) + if v3Stats, err = m.migrateV2Stats(v2Stats); err != nil && err != utils.ErrNoMoreData { return err } else if err == utils.ErrNoMoreData { break } version = 3 } - if version == current[utils.StatS] || !migrated { + if version == current[utils.StatS] || err == utils.ErrNoMoreData { break } } - if !m.dryRun && migrated { - // Set into db + if err == utils.ErrNoMoreData || !migrated { + break } + if !m.dryRun && migrated { + if vrs[utils.StatS] == 1 { + if err := m.dmOut.DataManager().SetFilter(filter); err != nil { + return err + } + if err := m.dmOut.DataManager().SetStatQueueProfile(sts, true); err != nil { + return err + } + } + // Set the fresh-migrated Stats into DB + if err = m.dmOut.DataManager().SetStatQueue(remakeQueue(v3Stats)); err != nil { + return err + } + } + } + // call the remove function here + + // All done, update version wtih current one + vrs = engine.Versions{utils.StatS: engine.CurrentDataDBVersions()[utils.StatS]} + if err = m.dmOut.DataManager().DataDB().SetVersions(vrs, false); err != nil { + return utils.NewCGRError(utils.Migrator, + utils.ServerErrorCaps, + err.Error(), + fmt.Sprintf("error: <%s> when updating Stats version into dataDB", err.Error())) } return m.ensureIndexesDataDB(engine.ColSqs) }