mirror of
https://github.com/cgrates/cgrates.git
synced 2026-02-11 18:16:24 +05:00
Added set/get methods for stats v2
This commit is contained in:
committed by
Dan Christian Bogos
parent
afbd3679ef
commit
e097d79a9d
@@ -36,6 +36,8 @@ type MigratorDataDB interface {
|
||||
setV1SharedGroup(x *v1SharedGroup) (err error)
|
||||
getV1Stats() (v1st *v1Stat, err error)
|
||||
setV1Stats(x *v1Stat) (err error)
|
||||
getV2Stats() (v2 *engine.StatQueue, err error)
|
||||
setV2Stats(v2 *engine.StatQueue) (err error)
|
||||
getV2ActionTrigger() (v2at *v2ActionTrigger, err error)
|
||||
setV2ActionTrigger(x *v2ActionTrigger) (err error)
|
||||
getv2Account() (v2Acnt *v2Account, err error)
|
||||
|
||||
@@ -19,6 +19,7 @@ along with this program. If not, see <http://www.gnu.org/licenses/>
|
||||
package migrator
|
||||
|
||||
import (
|
||||
"errors"
|
||||
"fmt"
|
||||
"strconv"
|
||||
"strings"
|
||||
@@ -162,39 +163,14 @@ func remakeQueue(sq *engine.StatQueue) (out *engine.StatQueue) {
|
||||
func (m *Migrator) migrateV2Stats(v2Stats *engine.StatQueue) (v3Stats *engine.StatQueue, err error) {
|
||||
if v2Stats == nil {
|
||||
// read from DB
|
||||
|
||||
}
|
||||
|
||||
var ids []string
|
||||
//StatQueue
|
||||
if ids, err = m.dmIN.DataManager().DataDB().GetKeysForPrefix(utils.StatQueuePrefix); err != nil {
|
||||
return nil, err
|
||||
}
|
||||
for _, id := range ids {
|
||||
tntID := strings.SplitN(strings.TrimPrefix(id, utils.StatQueuePrefix), utils.InInFieldSep, 2)
|
||||
if len(tntID) < 2 {
|
||||
return nil, fmt.Errorf("Invalid key <%s> when migrating stat queues", id)
|
||||
}
|
||||
sgs, err := m.dmIN.DataManager().GetStatQueue(tntID[0], tntID[1], false, false, utils.NonTransactional)
|
||||
v2Stats, err = m.dmIN.getV2Stats()
|
||||
if err != nil {
|
||||
return nil, err
|
||||
} else if v2Stats == nil {
|
||||
return nil, errors.New("Stats NIL")
|
||||
}
|
||||
if sgs == nil || m.dryRun {
|
||||
continue
|
||||
}
|
||||
// if err = m.dmOut.DataManager().SetStatQueue(remakeQueue(sgs)); err != nil {
|
||||
// return err
|
||||
// }
|
||||
// if err = m.dmIN.DataManager().RemoveStatQueue(tntID[0], tntID[1], utils.NonTransactional); err != nil {
|
||||
// return err
|
||||
// }
|
||||
m.stats[utils.StatS] += 1
|
||||
}
|
||||
|
||||
// if err = m.moveStatQueueProfile(); err != nil {
|
||||
// return err
|
||||
// }
|
||||
|
||||
v3Stats = remakeQueue(v2Stats)
|
||||
return
|
||||
}
|
||||
|
||||
@@ -262,7 +238,7 @@ func (m *Migrator) migrateStats() (err error) {
|
||||
}
|
||||
}
|
||||
// Set the fresh-migrated Stats into DB
|
||||
if err = m.dmOut.DataManager().SetStatQueue(remakeQueue(v3Stats)); err != nil {
|
||||
if err = m.dmOut.DataManager().SetStatQueue(v3Stats); err != nil {
|
||||
return err
|
||||
}
|
||||
}
|
||||
|
||||
@@ -129,6 +129,15 @@ func (iDBMig *internalMigrator) setV1Stats(x *v1Stat) (err error) {
|
||||
return utils.ErrNotImplemented
|
||||
}
|
||||
|
||||
func (iDBMig *internalMigrator) getV2Stats() (v2 *engine.StatQueue, err error) {
|
||||
return nil, utils.ErrNotImplemented
|
||||
}
|
||||
|
||||
//set
|
||||
func (iDBMig *internalMigrator) setV2Stats(v2 *engine.StatQueue) (err error) {
|
||||
return utils.ErrNotImplemented
|
||||
}
|
||||
|
||||
//Action methods
|
||||
//get
|
||||
func (iDBMig *internalMigrator) getV2ActionTrigger() (v2at *v2ActionTrigger, err error) {
|
||||
|
||||
@@ -280,6 +280,32 @@ func (v1ms *mongoMigrator) setV1Stats(x *v1Stat) (err error) {
|
||||
return
|
||||
}
|
||||
|
||||
// get V2
|
||||
func (v1ms *mongoMigrator) getV2Stats() (v2 *engine.StatQueue, err error) {
|
||||
if v1ms.cursor == nil {
|
||||
v1ms.cursor, err = v1ms.mgoDB.DB().Collection(utils.StatQueuePrefix).Find(v1ms.mgoDB.GetContext(), bson.D{})
|
||||
if err != nil {
|
||||
return nil, err
|
||||
}
|
||||
}
|
||||
if !(*v1ms.cursor).Next(v1ms.mgoDB.GetContext()) {
|
||||
(*v1ms.cursor).Close(v1ms.mgoDB.GetContext())
|
||||
v1ms.cursor = nil
|
||||
return nil, utils.ErrNoMoreData
|
||||
}
|
||||
v2 = new(engine.StatQueue)
|
||||
if err := (*v1ms.cursor).Decode(v2); err != nil {
|
||||
return nil, err
|
||||
}
|
||||
return v2, nil
|
||||
}
|
||||
|
||||
// set v2
|
||||
func (v1ms *mongoMigrator) setV2Stats(v2 *engine.StatQueue) (err error) {
|
||||
_, err = v1ms.mgoDB.DB().Collection(utils.StatQueuePrefix).InsertOne(v1ms.mgoDB.GetContext(), v2)
|
||||
return
|
||||
}
|
||||
|
||||
//Stats methods
|
||||
//get
|
||||
func (v1ms *mongoMigrator) getV2ActionTrigger() (v2at *v2ActionTrigger, err error) {
|
||||
|
||||
@@ -354,6 +354,46 @@ func (v1rs *redisMigrator) setV1Stats(x *v1Stat) (err error) {
|
||||
return
|
||||
}
|
||||
|
||||
//get
|
||||
func (v1rs *redisMigrator) getV2Stats() (v2 *engine.StatQueue, err error) {
|
||||
if v1rs.qryIdx == nil {
|
||||
v1rs.dataKeys, err = v1rs.rds.GetKeysForPrefix(utils.StatQueuePrefix)
|
||||
if err != nil {
|
||||
return
|
||||
} else if len(v1rs.dataKeys) == 0 {
|
||||
return nil, utils.ErrNoMoreData
|
||||
}
|
||||
v1rs.qryIdx = utils.IntPointer(0)
|
||||
}
|
||||
if *v1rs.qryIdx <= len(v1rs.dataKeys)-1 {
|
||||
strVal, err := v1rs.rds.Cmd("GET", v1rs.dataKeys[*v1rs.qryIdx]).Bytes()
|
||||
if err != nil {
|
||||
return nil, err
|
||||
}
|
||||
if err := v1rs.rds.Marshaler().Unmarshal(strVal, &v2); err != nil {
|
||||
return nil, err
|
||||
}
|
||||
*v1rs.qryIdx = *v1rs.qryIdx + 1
|
||||
} else {
|
||||
v1rs.qryIdx = nil
|
||||
return nil, utils.ErrNoMoreData
|
||||
}
|
||||
return v2, nil
|
||||
}
|
||||
|
||||
//set
|
||||
func (v1rs *redisMigrator) setV2Stats(v2 *engine.StatQueue) (err error) {
|
||||
key := utils.StatQueuePrefix + v2.ID
|
||||
bit, err := v1rs.rds.Marshaler().Marshal(v2)
|
||||
if err != nil {
|
||||
return err
|
||||
}
|
||||
if err = v1rs.rds.Cmd("SET", key, bit).Err; err != nil {
|
||||
return err
|
||||
}
|
||||
return
|
||||
}
|
||||
|
||||
//Action methods
|
||||
//get
|
||||
func (v1rs *redisMigrator) getV2ActionTrigger() (v2at *v2ActionTrigger, err error) {
|
||||
|
||||
Reference in New Issue
Block a user