From e097d79a9da98ab6b5057184775803b44045a598 Mon Sep 17 00:00:00 2001 From: adragusin Date: Thu, 4 Jun 2020 18:00:05 +0300 Subject: [PATCH] Added set/get methods for stats v2 --- migrator/migrator_datadb.go | 2 ++ migrator/stats.go | 36 +++++----------------------- migrator/storage_map_datadb.go | 9 +++++++ migrator/storage_mongo_datadb.go | 26 +++++++++++++++++++++ migrator/storage_redis.go | 40 ++++++++++++++++++++++++++++++++ 5 files changed, 83 insertions(+), 30 deletions(-) diff --git a/migrator/migrator_datadb.go b/migrator/migrator_datadb.go index 22593c213..a724d1c8b 100644 --- a/migrator/migrator_datadb.go +++ b/migrator/migrator_datadb.go @@ -36,6 +36,8 @@ type MigratorDataDB interface { setV1SharedGroup(x *v1SharedGroup) (err error) getV1Stats() (v1st *v1Stat, err error) setV1Stats(x *v1Stat) (err error) + getV2Stats() (v2 *engine.StatQueue, err error) + setV2Stats(v2 *engine.StatQueue) (err error) getV2ActionTrigger() (v2at *v2ActionTrigger, err error) setV2ActionTrigger(x *v2ActionTrigger) (err error) getv2Account() (v2Acnt *v2Account, err error) diff --git a/migrator/stats.go b/migrator/stats.go index 359a1f5b4..37710607b 100644 --- a/migrator/stats.go +++ b/migrator/stats.go @@ -19,6 +19,7 @@ along with this program. If not, see package migrator import ( + "errors" "fmt" "strconv" "strings" @@ -162,39 +163,14 @@ func remakeQueue(sq *engine.StatQueue) (out *engine.StatQueue) { func (m *Migrator) migrateV2Stats(v2Stats *engine.StatQueue) (v3Stats *engine.StatQueue, err error) { if v2Stats == nil { // read from DB - - } - - var ids []string - //StatQueue - if ids, err = m.dmIN.DataManager().DataDB().GetKeysForPrefix(utils.StatQueuePrefix); err != nil { - return nil, err - } - for _, id := range ids { - tntID := strings.SplitN(strings.TrimPrefix(id, utils.StatQueuePrefix), utils.InInFieldSep, 2) - if len(tntID) < 2 { - return nil, fmt.Errorf("Invalid key <%s> when migrating stat queues", id) - } - sgs, err := m.dmIN.DataManager().GetStatQueue(tntID[0], tntID[1], false, false, utils.NonTransactional) + v2Stats, err = m.dmIN.getV2Stats() if err != nil { return nil, err + } else if v2Stats == nil { + return nil, errors.New("Stats NIL") } - if sgs == nil || m.dryRun { - continue - } - // if err = m.dmOut.DataManager().SetStatQueue(remakeQueue(sgs)); err != nil { - // return err - // } - // if err = m.dmIN.DataManager().RemoveStatQueue(tntID[0], tntID[1], utils.NonTransactional); err != nil { - // return err - // } - m.stats[utils.StatS] += 1 } - - // if err = m.moveStatQueueProfile(); err != nil { - // return err - // } - + v3Stats = remakeQueue(v2Stats) return } @@ -262,7 +238,7 @@ func (m *Migrator) migrateStats() (err error) { } } // Set the fresh-migrated Stats into DB - if err = m.dmOut.DataManager().SetStatQueue(remakeQueue(v3Stats)); err != nil { + if err = m.dmOut.DataManager().SetStatQueue(v3Stats); err != nil { return err } } diff --git a/migrator/storage_map_datadb.go b/migrator/storage_map_datadb.go index 050d0ff9d..15406be93 100755 --- a/migrator/storage_map_datadb.go +++ b/migrator/storage_map_datadb.go @@ -129,6 +129,15 @@ func (iDBMig *internalMigrator) setV1Stats(x *v1Stat) (err error) { return utils.ErrNotImplemented } +func (iDBMig *internalMigrator) getV2Stats() (v2 *engine.StatQueue, err error) { + return nil, utils.ErrNotImplemented +} + +//set +func (iDBMig *internalMigrator) setV2Stats(v2 *engine.StatQueue) (err error) { + return utils.ErrNotImplemented +} + //Action methods //get func (iDBMig *internalMigrator) getV2ActionTrigger() (v2at *v2ActionTrigger, err error) { diff --git a/migrator/storage_mongo_datadb.go b/migrator/storage_mongo_datadb.go index f7f1cc312..562fb8af9 100644 --- a/migrator/storage_mongo_datadb.go +++ b/migrator/storage_mongo_datadb.go @@ -280,6 +280,32 @@ func (v1ms *mongoMigrator) setV1Stats(x *v1Stat) (err error) { return } +// get V2 +func (v1ms *mongoMigrator) getV2Stats() (v2 *engine.StatQueue, err error) { + if v1ms.cursor == nil { + v1ms.cursor, err = v1ms.mgoDB.DB().Collection(utils.StatQueuePrefix).Find(v1ms.mgoDB.GetContext(), bson.D{}) + if err != nil { + return nil, err + } + } + if !(*v1ms.cursor).Next(v1ms.mgoDB.GetContext()) { + (*v1ms.cursor).Close(v1ms.mgoDB.GetContext()) + v1ms.cursor = nil + return nil, utils.ErrNoMoreData + } + v2 = new(engine.StatQueue) + if err := (*v1ms.cursor).Decode(v2); err != nil { + return nil, err + } + return v2, nil +} + +// set v2 +func (v1ms *mongoMigrator) setV2Stats(v2 *engine.StatQueue) (err error) { + _, err = v1ms.mgoDB.DB().Collection(utils.StatQueuePrefix).InsertOne(v1ms.mgoDB.GetContext(), v2) + return +} + //Stats methods //get func (v1ms *mongoMigrator) getV2ActionTrigger() (v2at *v2ActionTrigger, err error) { diff --git a/migrator/storage_redis.go b/migrator/storage_redis.go index 09b67c8f3..91b0bf2b1 100644 --- a/migrator/storage_redis.go +++ b/migrator/storage_redis.go @@ -354,6 +354,46 @@ func (v1rs *redisMigrator) setV1Stats(x *v1Stat) (err error) { return } +//get +func (v1rs *redisMigrator) getV2Stats() (v2 *engine.StatQueue, err error) { + if v1rs.qryIdx == nil { + v1rs.dataKeys, err = v1rs.rds.GetKeysForPrefix(utils.StatQueuePrefix) + if err != nil { + return + } else if len(v1rs.dataKeys) == 0 { + return nil, utils.ErrNoMoreData + } + v1rs.qryIdx = utils.IntPointer(0) + } + if *v1rs.qryIdx <= len(v1rs.dataKeys)-1 { + strVal, err := v1rs.rds.Cmd("GET", v1rs.dataKeys[*v1rs.qryIdx]).Bytes() + if err != nil { + return nil, err + } + if err := v1rs.rds.Marshaler().Unmarshal(strVal, &v2); err != nil { + return nil, err + } + *v1rs.qryIdx = *v1rs.qryIdx + 1 + } else { + v1rs.qryIdx = nil + return nil, utils.ErrNoMoreData + } + return v2, nil +} + +//set +func (v1rs *redisMigrator) setV2Stats(v2 *engine.StatQueue) (err error) { + key := utils.StatQueuePrefix + v2.ID + bit, err := v1rs.rds.Marshaler().Marshal(v2) + if err != nil { + return err + } + if err = v1rs.rds.Cmd("SET", key, bit).Err; err != nil { + return err + } + return +} + //Action methods //get func (v1rs *redisMigrator) getV2ActionTrigger() (v2at *v2ActionTrigger, err error) {