Added new mongo driver to migrator

This commit is contained in:
Trial97
2018-12-06 12:55:08 +02:00
committed by Dan Christian Bogos
parent 4f8b4b2f45
commit 709c319c57
8 changed files with 548 additions and 71 deletions

View File

@@ -67,7 +67,7 @@ func InitStorDb(cfg *config.CGRConfig) error {
cfg.StorDbCfg().StorDBType)); err != nil {
return err
}
if utils.IsSliceMember([]string{utils.MYSQL, utils.POSTGRES},
if utils.IsSliceMember([]string{utils.MYSQL, utils.POSTGRES, utils.MONGO},
cfg.StorDbCfg().StorDBType) {
if err := SetDBVersions(storDb); err != nil {
return err

View File

@@ -34,7 +34,6 @@ import (
var (
rdsITdb *RedisStorage
mgoITdb *MongoStorage
mgoITdb2 *MongoStorageNew
onStor *DataManager
onStorCfg string
sleepDelay time.Duration
@@ -107,43 +106,23 @@ func TestOnStorITRedis(t *testing.T) {
}
}
// func TestOnStorITMongo(t *testing.T) {
// sleepDelay = 500 * time.Millisecond
// cdrsMongoCfgPath := path.Join(*dataDir, "conf", "samples", "cdrsv2mongo")
// mgoITCfg, err := config.NewCGRConfigFromFolder(cdrsMongoCfgPath)
// if err != nil {
// t.Fatal(err)
// }
// if mgoITdb, err = NewMongoStorage(mgoITCfg.StorDbCfg().StorDBHost,
// mgoITCfg.StorDbCfg().StorDBPort, mgoITCfg.StorDbCfg().StorDBName,
// mgoITCfg.StorDbCfg().StorDBUser, mgoITCfg.StorDbCfg().StorDBPass,
// utils.StorDB, nil, mgoITCfg.CacheCfg()); err != nil {
// t.Fatal(err)
// }
// onStorCfg = mgoITCfg.StorDbCfg().StorDBName
// onStor = NewDataManager(mgoITdb)
// for _, stest := range sTestsOnStorIT {
// t.Run("TestOnStorITMongo", stest)
// }
// }
func TestOnStorITMongo2(t *testing.T) {
func TestOnStorITMongo(t *testing.T) {
sleepDelay = 500 * time.Millisecond
cdrsMongoCfgPath := path.Join(*dataDir, "conf", "samples", "cdrsv2mongo")
mgoITCfg, err := config.NewCGRConfigFromFolder(cdrsMongoCfgPath)
if err != nil {
t.Fatal(err)
}
if mgoITdb2, err = NewMongoStorage(mgoITCfg.StorDbCfg().StorDBHost,
if mgoITdb, err = NewMongoStorage(mgoITCfg.StorDbCfg().StorDBHost,
mgoITCfg.StorDbCfg().StorDBPort, mgoITCfg.StorDbCfg().StorDBName,
mgoITCfg.StorDbCfg().StorDBUser, mgoITCfg.StorDbCfg().StorDBPass,
utils.StorDB, nil, mgoITCfg.CacheCfg()); err != nil {
t.Fatal(err)
}
onStorCfg = mgoITCfg.StorDbCfg().StorDBName
onStor = NewDataManager(mgoITdb2)
onStor = NewDataManager(mgoITdb)
for _, stest := range sTestsOnStorIT {
t.Run("TestOnStorITMongoNew", stest)
t.Run("TestOnStorITMongo", stest)
}
}

View File

@@ -73,12 +73,32 @@ func NewMongoStorage(host, port, db, user, pass, storageType string,
cacheCfg: cacheCfg,
cdrsIndexes: cdrsIndexes,
}
if col, err := ms.client.Database(db).ListCollections(ms.ctx, nil); err != nil {
return nil, err
} else if !col.Next(ctx) { // create indexes only if database is empty
if err = ms.EnsureIndexes(); err != nil {
return nil, err
if err = ms.client.UseSession(ms.ctx, func(sctx mongo.SessionContext) error {
if col, err := ms.client.Database(db).ListCollections(sctx, nil, options.ListCollections().SetNameOnly(true)); err != nil {
return err
} else {
empty := true
for col.Next(sctx) { // create indexes only if database is empty or only version table is present
var elem struct{ Name string }
err := col.Decode(&elem)
if err != nil {
return err
}
if elem.Name != colVer {
empty = false
break
}
}
col.Close(sctx)
if empty {
if err = ms.EnsureIndexes(); err != nil {
return err
}
}
return nil
}
}); err != nil {
return nil, err
}
ms.cnter = utils.NewCounter(time.Now().UnixNano(), 0)
return
@@ -102,9 +122,7 @@ func (ms *MongoStorage) EnusureIndex(colName string, uniq bool, keys ...string)
col := ms.getCol(colName)
io := mongo.NewIndexOptionsBuilder().Unique(uniq)
var doc bsonx.Doc
// elem := make([]bson.M, len(keys))
for _, k := range keys {
// elem[i] = bson.M{k: 1}
doc = doc.Append(k, bsonx.Int32(1))
}
_, err := col.Indexes().CreateOne(sctx, mongo.IndexModel{
@@ -119,6 +137,10 @@ func (ms *MongoStorage) getCol(col string) *mongo.Collection {
return ms.client.Database(ms.db).Collection(col)
}
func (ms *MongoStorage) GetContext() context.Context {
return ms.ctx
}
// EnsureIndexes creates db indexes
func (ms *MongoStorage) EnsureIndexes() (err error) {
if ms.storageType == utils.DataDB {

View File

@@ -92,6 +92,7 @@ func SetDBVersions(storage Storage) (err error) {
// no data, write version
if err = storage.SetVersions(x, false); err != nil {
utils.Logger.Warning(fmt.Sprintf("Could not write current version to db: %v", err))
return err
}
return
}

View File

@@ -32,7 +32,7 @@ const (
v2ThresholdProfileCol = "threshold_profiles"
)
type mongoMigrator struct {
type mongoMigratorOld struct {
dm *engine.DataManager
mgoDB *engine.MongoStorageOld
qryIter *mgo.Iter
@@ -47,22 +47,22 @@ type AtKeyValue struct {
Value v1ActionPlans
}
func newMongoMigrator(dm *engine.DataManager) (mgoMig *mongoMigrator) {
return &mongoMigrator{
func newmongoMigratorOldOld(dm *engine.DataManager) (mgoMig *mongoMigratorOld) {
return &mongoMigratorOld{
dm: dm,
mgoDB: dm.DataDB().(*engine.MongoStorageOld),
qryIter: nil,
}
}
func (mgoMig *mongoMigrator) DataManager() *engine.DataManager {
func (mgoMig *mongoMigratorOld) DataManager() *engine.DataManager {
return mgoMig.dm
}
//Account methods
//V1
//get
func (v1ms *mongoMigrator) getv1Account() (v1Acnt *v1Account, err error) {
func (v1ms *mongoMigratorOld) getv1Account() (v1Acnt *v1Account, err error) {
if v1ms.qryIter == nil {
v1ms.qryIter = v1ms.mgoDB.DB().C(v1AccountDBPrefix).Find(nil).Iter()
}
@@ -77,7 +77,7 @@ func (v1ms *mongoMigrator) getv1Account() (v1Acnt *v1Account, err error) {
}
//set
func (v1ms *mongoMigrator) setV1Account(x *v1Account) (err error) {
func (v1ms *mongoMigratorOld) setV1Account(x *v1Account) (err error) {
if err := v1ms.mgoDB.DB().C(v1AccountDBPrefix).Insert(x); err != nil {
return err
}
@@ -85,13 +85,13 @@ func (v1ms *mongoMigrator) setV1Account(x *v1Account) (err error) {
}
//rem
func (v1ms *mongoMigrator) remV1Account(id string) (err error) {
func (v1ms *mongoMigratorOld) remV1Account(id string) (err error) {
return v1ms.mgoDB.DB().C(v1AccountDBPrefix).Remove(bson.M{"id": id})
}
//V2
//get
func (v1ms *mongoMigrator) getv2Account() (v2Acnt *v2Account, err error) {
func (v1ms *mongoMigratorOld) getv2Account() (v2Acnt *v2Account, err error) {
if v1ms.qryIter == nil {
v1ms.qryIter = v1ms.mgoDB.DB().C(v2AccountsCol).Find(nil).Iter()
}
@@ -106,7 +106,7 @@ func (v1ms *mongoMigrator) getv2Account() (v2Acnt *v2Account, err error) {
}
//set
func (v1ms *mongoMigrator) setV2Account(x *v2Account) (err error) {
func (v1ms *mongoMigratorOld) setV2Account(x *v2Account) (err error) {
if err := v1ms.mgoDB.DB().C(v2AccountsCol).Insert(x); err != nil {
return err
}
@@ -114,13 +114,13 @@ func (v1ms *mongoMigrator) setV2Account(x *v2Account) (err error) {
}
//rem
func (v1ms *mongoMigrator) remV2Account(id string) (err error) {
func (v1ms *mongoMigratorOld) remV2Account(id string) (err error) {
return v1ms.mgoDB.DB().C(v2AccountsCol).Remove(bson.M{"id": id})
}
//Action methods
//get
func (v1ms *mongoMigrator) getV1ActionPlans() (v1aps *v1ActionPlans, err error) {
func (v1ms *mongoMigratorOld) getV1ActionPlans() (v1aps *v1ActionPlans, err error) {
var strct *AtKeyValue
if v1ms.qryIter == nil {
v1ms.qryIter = v1ms.mgoDB.DB().C("actiontimings").Find(nil).Iter()
@@ -135,7 +135,7 @@ func (v1ms *mongoMigrator) getV1ActionPlans() (v1aps *v1ActionPlans, err error)
}
//set
func (v1ms *mongoMigrator) setV1ActionPlans(x *v1ActionPlans) (err error) {
func (v1ms *mongoMigratorOld) setV1ActionPlans(x *v1ActionPlans) (err error) {
key := utils.ACTION_PLAN_PREFIX + (*x)[0].Id
if err := v1ms.mgoDB.DB().C("actiontimings").Insert(&AtKeyValue{key, *x}); err != nil {
return err
@@ -145,7 +145,7 @@ func (v1ms *mongoMigrator) setV1ActionPlans(x *v1ActionPlans) (err error) {
//Actions methods
//get
func (v1ms *mongoMigrator) getV1Actions() (v1acs *v1Actions, err error) {
func (v1ms *mongoMigratorOld) getV1Actions() (v1acs *v1Actions, err error) {
var strct *AcKeyValue
if v1ms.qryIter == nil {
v1ms.qryIter = v1ms.mgoDB.DB().C("actions").Find(nil).Iter()
@@ -161,7 +161,7 @@ func (v1ms *mongoMigrator) getV1Actions() (v1acs *v1Actions, err error) {
}
//set
func (v1ms *mongoMigrator) setV1Actions(x *v1Actions) (err error) {
func (v1ms *mongoMigratorOld) setV1Actions(x *v1Actions) (err error) {
key := utils.ACTION_PREFIX + (*x)[0].Id
if err := v1ms.mgoDB.DB().C("actions").Insert(&AcKeyValue{key, *x}); err != nil {
return err
@@ -171,18 +171,18 @@ func (v1ms *mongoMigrator) setV1Actions(x *v1Actions) (err error) {
//ActionTriggers methods
//get
func (v1ms *mongoMigrator) getV1ActionTriggers() (v1acts *v1ActionTriggers, err error) {
func (v1ms *mongoMigratorOld) getV1ActionTriggers() (v1acts *v1ActionTriggers, err error) {
return nil, utils.ErrNotImplemented
}
//set
func (v1ms *mongoMigrator) setV1ActionTriggers(x *v1ActionTriggers) (err error) {
func (v1ms *mongoMigratorOld) setV1ActionTriggers(x *v1ActionTriggers) (err error) {
return utils.ErrNotImplemented
}
//Actions methods
//get
func (v1ms *mongoMigrator) getV1SharedGroup() (v1sg *v1SharedGroup, err error) {
func (v1ms *mongoMigratorOld) getV1SharedGroup() (v1sg *v1SharedGroup, err error) {
if v1ms.qryIter == nil {
v1ms.qryIter = v1ms.mgoDB.DB().C(utils.SHARED_GROUP_PREFIX).Find(nil).Iter()
}
@@ -196,7 +196,7 @@ func (v1ms *mongoMigrator) getV1SharedGroup() (v1sg *v1SharedGroup, err error) {
}
//set
func (v1ms *mongoMigrator) setV1SharedGroup(x *v1SharedGroup) (err error) {
func (v1ms *mongoMigratorOld) setV1SharedGroup(x *v1SharedGroup) (err error) {
if err := v1ms.mgoDB.DB().C(utils.SHARED_GROUP_PREFIX).Insert(x); err != nil {
return err
}
@@ -205,7 +205,7 @@ func (v1ms *mongoMigrator) setV1SharedGroup(x *v1SharedGroup) (err error) {
//Stats methods
//get
func (v1ms *mongoMigrator) getV1Stats() (v1st *v1Stat, err error) {
func (v1ms *mongoMigratorOld) getV1Stats() (v1st *v1Stat, err error) {
if v1ms.qryIter == nil {
v1ms.qryIter = v1ms.mgoDB.DB().C(utils.CDR_STATS_PREFIX).Find(nil).Iter()
}
@@ -219,7 +219,7 @@ func (v1ms *mongoMigrator) getV1Stats() (v1st *v1Stat, err error) {
}
//set
func (v1ms *mongoMigrator) setV1Stats(x *v1Stat) (err error) {
func (v1ms *mongoMigratorOld) setV1Stats(x *v1Stat) (err error) {
if err := v1ms.mgoDB.DB().C(utils.CDR_STATS_PREFIX).Insert(x); err != nil {
return err
}
@@ -228,7 +228,7 @@ func (v1ms *mongoMigrator) setV1Stats(x *v1Stat) (err error) {
//Stats methods
//get
func (v1ms *mongoMigrator) getV2ActionTrigger() (v2at *v2ActionTrigger, err error) {
func (v1ms *mongoMigratorOld) getV2ActionTrigger() (v2at *v2ActionTrigger, err error) {
if v1ms.qryIter == nil {
v1ms.qryIter = v1ms.mgoDB.DB().C(v1ActionTriggersCol).Find(nil).Iter()
}
@@ -242,7 +242,7 @@ func (v1ms *mongoMigrator) getV2ActionTrigger() (v2at *v2ActionTrigger, err erro
}
//set
func (v1ms *mongoMigrator) setV2ActionTrigger(x *v2ActionTrigger) (err error) {
func (v1ms *mongoMigratorOld) setV2ActionTrigger(x *v2ActionTrigger) (err error) {
if err := v1ms.mgoDB.DB().C(v1ActionTriggersCol).Insert(x); err != nil {
return err
}
@@ -251,7 +251,7 @@ func (v1ms *mongoMigrator) setV2ActionTrigger(x *v2ActionTrigger) (err error) {
//AttributeProfile methods
//get
func (v1ms *mongoMigrator) getV1AttributeProfile() (v1attrPrf *v1AttributeProfile, err error) {
func (v1ms *mongoMigratorOld) getV1AttributeProfile() (v1attrPrf *v1AttributeProfile, err error) {
if v1ms.qryIter == nil {
v1ms.qryIter = v1ms.mgoDB.DB().C(v1AttributeProfilesCol).Find(nil).Iter()
}
@@ -265,7 +265,7 @@ func (v1ms *mongoMigrator) getV1AttributeProfile() (v1attrPrf *v1AttributeProfil
}
//set
func (v1ms *mongoMigrator) setV1AttributeProfile(x *v1AttributeProfile) (err error) {
func (v1ms *mongoMigratorOld) setV1AttributeProfile(x *v1AttributeProfile) (err error) {
if err := v1ms.mgoDB.DB().C(v1AttributeProfilesCol).Insert(x); err != nil {
return err
}
@@ -274,7 +274,7 @@ func (v1ms *mongoMigrator) setV1AttributeProfile(x *v1AttributeProfile) (err err
//ThresholdProfile methods
//get
func (v1ms *mongoMigrator) getV2ThresholdProfile() (v2T *v2Threshold, err error) {
func (v1ms *mongoMigratorOld) getV2ThresholdProfile() (v2T *v2Threshold, err error) {
if v1ms.qryIter == nil {
v1ms.qryIter = v1ms.mgoDB.DB().C(v2ThresholdProfileCol).Find(nil).Iter()
}
@@ -288,7 +288,7 @@ func (v1ms *mongoMigrator) getV2ThresholdProfile() (v2T *v2Threshold, err error)
}
//set
func (v1ms *mongoMigrator) setV2ThresholdProfile(x *v2Threshold) (err error) {
func (v1ms *mongoMigratorOld) setV2ThresholdProfile(x *v2Threshold) (err error) {
if err := v1ms.mgoDB.DB().C(v2ThresholdProfileCol).Insert(x); err != nil {
return err
}
@@ -296,6 +296,6 @@ func (v1ms *mongoMigrator) setV2ThresholdProfile(x *v2Threshold) (err error) {
}
//rem
func (v1ms *mongoMigrator) remV2ThresholdProfile(tenant, id string) (err error) {
func (v1ms *mongoMigratorOld) remV2ThresholdProfile(tenant, id string) (err error) {
return v1ms.mgoDB.DB().C(v2ThresholdProfileCol).Remove(bson.M{"tenant": tenant, "id": id})
}

View File

@@ -25,27 +25,27 @@ import (
"github.com/cgrates/mgo/bson"
)
func newMongoStorDBMigrator(stor engine.StorDB) (mgoMig *mongoStorDBMigrator) {
return &mongoStorDBMigrator{
func newmongoStorDBMigratorOldOld(stor engine.StorDB) (mgoMig *mongoStorDBMigratorOld) {
return &mongoStorDBMigratorOld{
storDB: &stor,
mgoDB: stor.(*engine.MongoStorageOld),
qryIter: nil,
}
}
type mongoStorDBMigrator struct {
type mongoStorDBMigratorOld struct {
storDB *engine.StorDB
mgoDB *engine.MongoStorageOld
qryIter *mgo.Iter
}
func (mgoMig *mongoStorDBMigrator) StorDB() engine.StorDB {
func (mgoMig *mongoStorDBMigratorOld) StorDB() engine.StorDB {
return *mgoMig.storDB
}
//CDR methods
//get
func (v1ms *mongoStorDBMigrator) getV1CDR() (v1Cdr *v1Cdrs, err error) {
func (v1ms *mongoStorDBMigratorOld) getV1CDR() (v1Cdr *v1Cdrs, err error) {
if v1ms.qryIter == nil {
v1ms.qryIter = v1ms.mgoDB.DB().C(engine.ColCDRs).Find(nil).Iter()
}
@@ -60,7 +60,7 @@ func (v1ms *mongoStorDBMigrator) getV1CDR() (v1Cdr *v1Cdrs, err error) {
}
//set
func (v1ms *mongoStorDBMigrator) setV1CDR(v1Cdr *v1Cdrs) (err error) {
func (v1ms *mongoStorDBMigratorOld) setV1CDR(v1Cdr *v1Cdrs) (err error) {
if err = v1ms.mgoDB.DB().C(engine.ColCDRs).Insert(v1Cdr); err != nil {
return err
}
@@ -69,7 +69,7 @@ func (v1ms *mongoStorDBMigrator) setV1CDR(v1Cdr *v1Cdrs) (err error) {
//SMCost methods
//rename
func (v1ms *mongoStorDBMigrator) renameV1SMCosts() (err error) {
func (v1ms *mongoStorDBMigratorOld) renameV1SMCosts() (err error) {
if err = v1ms.mgoDB.DB().C(utils.OldSMCosts).DropCollection(); err != nil {
return err
}
@@ -77,7 +77,7 @@ func (v1ms *mongoStorDBMigrator) renameV1SMCosts() (err error) {
return v1ms.mgoDB.DB().Run(bson.D{{"create", utils.SessionsCostsTBL}}, result)
}
func (v1ms *mongoStorDBMigrator) createV1SMCosts() (err error) {
func (v1ms *mongoStorDBMigratorOld) createV1SMCosts() (err error) {
err = v1ms.mgoDB.DB().C(utils.OldSMCosts).DropCollection()
err = v1ms.mgoDB.DB().C(utils.SessionsCostsTBL).DropCollection()
result := make(map[string]string)
@@ -86,7 +86,7 @@ func (v1ms *mongoStorDBMigrator) createV1SMCosts() (err error) {
}
//get
func (v1ms *mongoStorDBMigrator) getV2SMCost() (v2Cost *v2SessionsCost, err error) {
func (v1ms *mongoStorDBMigratorOld) getV2SMCost() (v2Cost *v2SessionsCost, err error) {
if v1ms.qryIter == nil {
v1ms.qryIter = v1ms.mgoDB.DB().C(utils.SessionsCostsTBL).Find(nil).Iter()
}
@@ -101,7 +101,7 @@ func (v1ms *mongoStorDBMigrator) getV2SMCost() (v2Cost *v2SessionsCost, err erro
}
//set
func (v1ms *mongoStorDBMigrator) setV2SMCost(v2Cost *v2SessionsCost) (err error) {
func (v1ms *mongoStorDBMigratorOld) setV2SMCost(v2Cost *v2SessionsCost) (err error) {
if err = v1ms.mgoDB.DB().C(utils.SessionsCostsTBL).Insert(v2Cost); err != nil {
return err
}
@@ -109,7 +109,7 @@ func (v1ms *mongoStorDBMigrator) setV2SMCost(v2Cost *v2SessionsCost) (err error)
}
//remove
func (v1ms *mongoStorDBMigrator) remV2SMCost(v2Cost *v2SessionsCost) (err error) {
func (v1ms *mongoStorDBMigratorOld) remV2SMCost(v2Cost *v2SessionsCost) (err error) {
if err = v1ms.mgoDB.DB().C(utils.SessionsCostsTBL).Remove(nil); err != nil {
return err
}

View File

@@ -0,0 +1,350 @@
/*
Real-time Online/Offline Charging System (OCS) for Telecom & ISP environments
Copyright (C) ITsysCOM GmbH
This program is free software: you can redistribute it and/or modify
it under the terms of the GNU General Public License as published by
the Free Software Foundation, either version 3 of the License, or
(at your option) any later version.
This program is distributed in the hope that it will be useful,
but WITHOUT ANY WARRANTY; without even the implied warranty of
MERCHANTABILITY or FITNESS FOR A PARTICULAR PURPOSE. See the
GNU General Public License for more details.
You should have received a copy of the GNU General Public License
along with this program. If not, see <http://www.gnu.org/licenses/>
*/
package migrator
import (
"github.com/cgrates/cgrates/engine"
"github.com/cgrates/cgrates/utils"
// "github.com/cgrates/mgo"
// "github.com/cgrates/mgo/bson"
"github.com/mongodb/mongo-go-driver/bson"
"github.com/mongodb/mongo-go-driver/mongo"
)
// const (
// v2AccountsCol = "accounts"
// v1ActionTriggersCol = "action_triggers"
// v1AttributeProfilesCol = "attribute_profiles"
// v2ThresholdProfileCol = "threshold_profiles"
// )
type mongoMigrator struct {
dm *engine.DataManager
mgoDB *engine.MongoStorage
cursor *mongo.Cursor
}
// type AcKeyValue struct {
// Key string
// Value v1Actions
// }
// type AtKeyValue struct {
// Key string
// Value v1ActionPlans
// }
func newMongoMigrator(dm *engine.DataManager) (mgoMig *mongoMigrator) {
return &mongoMigrator{
dm: dm,
mgoDB: dm.DataDB().(*engine.MongoStorage),
cursor: nil,
}
}
func (mgoMig *mongoMigrator) DataManager() *engine.DataManager {
return mgoMig.dm
}
//Account methods
//V1
//get
func (v1ms *mongoMigrator) getv1Account() (v1Acnt *v1Account, err error) {
if v1ms.cursor == nil {
var cursor mongo.Cursor
cursor, err = v1ms.mgoDB.DB().Collection(v1AccountDBPrefix).Find(v1ms.mgoDB.GetContext(), nil)
if err != nil {
return nil, err
}
v1ms.cursor = &cursor
}
if !(*v1ms.cursor).Next(v1ms.mgoDB.GetContext()) {
(*v1ms.cursor).Close(v1ms.mgoDB.GetContext())
v1ms.cursor = nil
return nil, utils.ErrNoMoreData
}
if err := (*v1ms.cursor).Decode(v1Acnt); err != nil {
return nil, err
}
return v1Acnt, nil
}
//set
func (v1ms *mongoMigrator) setV1Account(x *v1Account) (err error) {
_, err = v1ms.mgoDB.DB().Collection(v1AccountDBPrefix).InsertOne(v1ms.mgoDB.GetContext(), x)
return
}
//rem
func (v1ms *mongoMigrator) remV1Account(id string) (err error) {
_, err = v1ms.mgoDB.DB().Collection(v1AccountDBPrefix).DeleteOne(v1ms.mgoDB.GetContext(), bson.M{"id": id})
return
}
//V2
//get
func (v1ms *mongoMigrator) getv2Account() (v2Acnt *v2Account, err error) {
if v1ms.cursor == nil {
var cursor mongo.Cursor
cursor, err = v1ms.mgoDB.DB().Collection(v2AccountsCol).Find(v1ms.mgoDB.GetContext(), nil)
if err != nil {
return nil, err
}
v1ms.cursor = &cursor
}
if !(*v1ms.cursor).Next(v1ms.mgoDB.GetContext()) {
(*v1ms.cursor).Close(v1ms.mgoDB.GetContext())
v1ms.cursor = nil
return nil, utils.ErrNoMoreData
}
if err := (*v1ms.cursor).Decode(v2Acnt); err != nil {
return nil, err
}
return v2Acnt, nil
}
//set
func (v1ms *mongoMigrator) setV2Account(x *v2Account) (err error) {
_, err = v1ms.mgoDB.DB().Collection(v2AccountsCol).InsertOne(v1ms.mgoDB.GetContext(), x)
return
}
//rem
func (v1ms *mongoMigrator) remV2Account(id string) (err error) {
_, err = v1ms.mgoDB.DB().Collection(v2AccountsCol).DeleteOne(v1ms.mgoDB.GetContext(), bson.M{"id": id})
return
}
//Action methods
//get
func (v1ms *mongoMigrator) getV1ActionPlans() (v1aps *v1ActionPlans, err error) {
var strct *AtKeyValue
if v1ms.cursor == nil {
var cursor mongo.Cursor
cursor, err = v1ms.mgoDB.DB().Collection("actiontimings").Find(v1ms.mgoDB.GetContext(), nil)
if err != nil {
return nil, err
}
v1ms.cursor = &cursor
}
if !(*v1ms.cursor).Next(v1ms.mgoDB.GetContext()) {
(*v1ms.cursor).Close(v1ms.mgoDB.GetContext())
v1ms.cursor = nil
return nil, utils.ErrNoMoreData
}
if err := (*v1ms.cursor).Decode(strct); err != nil {
return nil, err
}
return &strct.Value, nil
}
//set
func (v1ms *mongoMigrator) setV1ActionPlans(x *v1ActionPlans) (err error) {
key := utils.ACTION_PLAN_PREFIX + (*x)[0].Id
_, err = v1ms.mgoDB.DB().Collection("actiontimings").InsertOne(v1ms.mgoDB.GetContext(), &AtKeyValue{key, *x})
return
}
//Actions methods
//get
func (v1ms *mongoMigrator) getV1Actions() (v1acs *v1Actions, err error) {
var strct *AcKeyValue
if v1ms.cursor == nil {
var cursor mongo.Cursor
cursor, err = v1ms.mgoDB.DB().Collection("actions").Find(v1ms.mgoDB.GetContext(), nil)
if err != nil {
return nil, err
}
v1ms.cursor = &cursor
}
if !(*v1ms.cursor).Next(v1ms.mgoDB.GetContext()) {
(*v1ms.cursor).Close(v1ms.mgoDB.GetContext())
v1ms.cursor = nil
return nil, utils.ErrNoMoreData
}
if err := (*v1ms.cursor).Decode(strct); err != nil {
return nil, err
}
return &strct.Value, nil
}
//set
func (v1ms *mongoMigrator) setV1Actions(x *v1Actions) (err error) {
key := utils.ACTION_PREFIX + (*x)[0].Id
_, err = v1ms.mgoDB.DB().Collection("actions").InsertOne(v1ms.mgoDB.GetContext(), &AcKeyValue{key, *x})
return
}
//ActionTriggers methods
//get
func (v1ms *mongoMigrator) getV1ActionTriggers() (v1acts *v1ActionTriggers, err error) {
return nil, utils.ErrNotImplemented
}
//set
func (v1ms *mongoMigrator) setV1ActionTriggers(x *v1ActionTriggers) (err error) {
return utils.ErrNotImplemented
}
//Actions methods
//get
func (v1ms *mongoMigrator) getV1SharedGroup() (v1sg *v1SharedGroup, err error) {
if v1ms.cursor == nil {
var cursor mongo.Cursor
cursor, err = v1ms.mgoDB.DB().Collection(utils.SHARED_GROUP_PREFIX).Find(v1ms.mgoDB.GetContext(), nil)
if err != nil {
return nil, err
}
v1ms.cursor = &cursor
}
if !(*v1ms.cursor).Next(v1ms.mgoDB.GetContext()) {
(*v1ms.cursor).Close(v1ms.mgoDB.GetContext())
v1ms.cursor = nil
return nil, utils.ErrNoMoreData
}
if err := (*v1ms.cursor).Decode(v1sg); err != nil {
return nil, err
}
return v1sg, nil
}
//set
func (v1ms *mongoMigrator) setV1SharedGroup(x *v1SharedGroup) (err error) {
_, err = v1ms.mgoDB.DB().Collection(utils.SHARED_GROUP_PREFIX).InsertOne(v1ms.mgoDB.GetContext(), x)
return
}
//Stats methods
//get
func (v1ms *mongoMigrator) getV1Stats() (v1st *v1Stat, err error) {
if v1ms.cursor == nil {
var cursor mongo.Cursor
cursor, err = v1ms.mgoDB.DB().Collection(utils.CDR_STATS_PREFIX).Find(v1ms.mgoDB.GetContext(), nil)
if err != nil {
return nil, err
}
v1ms.cursor = &cursor
}
if !(*v1ms.cursor).Next(v1ms.mgoDB.GetContext()) {
(*v1ms.cursor).Close(v1ms.mgoDB.GetContext())
v1ms.cursor = nil
return nil, utils.ErrNoMoreData
}
if err := (*v1ms.cursor).Decode(v1st); err != nil {
return nil, err
}
return v1st, nil
}
//set
func (v1ms *mongoMigrator) setV1Stats(x *v1Stat) (err error) {
_, err = v1ms.mgoDB.DB().Collection(utils.CDR_STATS_PREFIX).InsertOne(v1ms.mgoDB.GetContext(), x)
return
}
//Stats methods
//get
func (v1ms *mongoMigrator) getV2ActionTrigger() (v2at *v2ActionTrigger, err error) {
if v1ms.cursor == nil {
var cursor mongo.Cursor
cursor, err = v1ms.mgoDB.DB().Collection(v1ActionTriggersCol).Find(v1ms.mgoDB.GetContext(), nil)
if err != nil {
return nil, err
}
v1ms.cursor = &cursor
}
if !(*v1ms.cursor).Next(v1ms.mgoDB.GetContext()) {
(*v1ms.cursor).Close(v1ms.mgoDB.GetContext())
v1ms.cursor = nil
return nil, utils.ErrNoMoreData
}
if err := (*v1ms.cursor).Decode(v2at); err != nil {
return nil, err
}
return v2at, nil
}
//set
func (v1ms *mongoMigrator) setV2ActionTrigger(x *v2ActionTrigger) (err error) {
_, err = v1ms.mgoDB.DB().Collection(v1ActionTriggersCol).InsertOne(v1ms.mgoDB.GetContext(), x)
return
}
//AttributeProfile methods
//get
func (v1ms *mongoMigrator) getV1AttributeProfile() (v1attrPrf *v1AttributeProfile, err error) {
if v1ms.cursor == nil {
var cursor mongo.Cursor
cursor, err = v1ms.mgoDB.DB().Collection(v1AttributeProfilesCol).Find(v1ms.mgoDB.GetContext(), nil)
if err != nil {
return nil, err
}
v1ms.cursor = &cursor
}
if !(*v1ms.cursor).Next(v1ms.mgoDB.GetContext()) {
(*v1ms.cursor).Close(v1ms.mgoDB.GetContext())
v1ms.cursor = nil
return nil, utils.ErrNoMoreData
}
if err := (*v1ms.cursor).Decode(v1attrPrf); err != nil {
return nil, err
}
return v1attrPrf, nil
}
//set
func (v1ms *mongoMigrator) setV1AttributeProfile(x *v1AttributeProfile) (err error) {
_, err = v1ms.mgoDB.DB().Collection(v1AttributeProfilesCol).InsertOne(v1ms.mgoDB.GetContext(), x)
return
}
//ThresholdProfile methods
//get
func (v1ms *mongoMigrator) getV2ThresholdProfile() (v2T *v2Threshold, err error) {
if v1ms.cursor == nil {
var cursor mongo.Cursor
cursor, err = v1ms.mgoDB.DB().Collection(v2ThresholdProfileCol).Find(v1ms.mgoDB.GetContext(), nil)
if err != nil {
return nil, err
}
v1ms.cursor = &cursor
}
if !(*v1ms.cursor).Next(v1ms.mgoDB.GetContext()) {
(*v1ms.cursor).Close(v1ms.mgoDB.GetContext())
v1ms.cursor = nil
return nil, utils.ErrNoMoreData
}
if err := (*v1ms.cursor).Decode(v2T); err != nil {
return nil, err
}
return v2T, nil
}
//set
func (v1ms *mongoMigrator) setV2ThresholdProfile(x *v2Threshold) (err error) {
_, err = v1ms.mgoDB.DB().Collection(v2ThresholdProfileCol).InsertOne(v1ms.mgoDB.GetContext(), x)
return
}
//rem
func (v1ms *mongoMigrator) remV2ThresholdProfile(tenant, id string) (err error) {
_, err = v1ms.mgoDB.DB().Collection(v2ThresholdProfileCol).DeleteOne(v1ms.mgoDB.GetContext(), bson.M{"tenant": tenant, "id": id})
return
}

View File

@@ -0,0 +1,125 @@
/*
Real-time Online/Offline Charging System (OCS) for Telecom & ISP environments
Copyright (C) ITsysCOM GmbH
This program is free software: you can redistribute it and/or modify
it under the terms of the GNU General Public License as published by
the Free Software Foundation, either version 3 of the License, or
(at your option) any later version.
This program is distributed in the hope that it will be useful,
but WITHOUT ANY WARRANTY; without even the implied warranty of
MERCHANTABILITY or FITNESS FOR A PARTICULAR PURPOSE. See the
GNU General Public License for more details.
You should have received a copy of the GNU General Public License
along with this program. If not, see <http://www.gnu.org/licenses/>
*/
package migrator
import (
"github.com/cgrates/cgrates/engine"
"github.com/cgrates/cgrates/utils"
// "github.com/cgrates/mgo"
// "github.com/cgrates/mgo/bson"
"github.com/mongodb/mongo-go-driver/bson"
"github.com/mongodb/mongo-go-driver/mongo"
)
func newMongoStorDBMigrator(stor engine.StorDB) (mgoMig *mongoStorDBMigrator) {
return &mongoStorDBMigrator{
storDB: &stor,
mgoDB: stor.(*engine.MongoStorage),
cursor: nil,
}
}
type mongoStorDBMigrator struct {
storDB *engine.StorDB
mgoDB *engine.MongoStorage
cursor *mongo.Cursor
}
func (mgoMig *mongoStorDBMigrator) StorDB() engine.StorDB {
return *mgoMig.storDB
}
//CDR methods
//get
func (v1ms *mongoStorDBMigrator) getV1CDR() (v1Cdr *v1Cdrs, err error) {
if v1ms.cursor == nil {
var cursor mongo.Cursor
cursor, err = v1ms.mgoDB.DB().Collection(engine.ColCDRs).Find(v1ms.mgoDB.GetContext(), nil)
if err != nil {
return nil, err
}
v1ms.cursor = &cursor
}
if !(*v1ms.cursor).Next(v1ms.mgoDB.GetContext()) {
(*v1ms.cursor).Close(v1ms.mgoDB.GetContext())
v1ms.cursor = nil
return nil, utils.ErrNoMoreData
}
if err := (*v1ms.cursor).Decode(v1Cdr); err != nil {
return nil, err
}
return v1Cdr, nil
}
//set
func (v1ms *mongoStorDBMigrator) setV1CDR(v1Cdr *v1Cdrs) (err error) {
_, err = v1ms.mgoDB.DB().Collection(engine.ColCDRs).InsertOne(v1ms.mgoDB.GetContext(), v1Cdr)
return
}
//SMCost methods
//rename
func (v1ms *mongoStorDBMigrator) renameV1SMCosts() (err error) {
if err = v1ms.mgoDB.DB().Collection(utils.OldSMCosts).Drop(v1ms.mgoDB.GetContext()); err != nil {
return err
}
return v1ms.mgoDB.DB().RunCommand(v1ms.mgoDB.GetContext(),
bson.D{{"create", utils.SessionsCostsTBL}}).Err()
}
func (v1ms *mongoStorDBMigrator) createV1SMCosts() (err error) {
v1ms.mgoDB.DB().Collection(utils.OldSMCosts).Drop(v1ms.mgoDB.GetContext())
v1ms.mgoDB.DB().Collection(utils.SessionsCostsTBL).Drop(v1ms.mgoDB.GetContext())
return v1ms.mgoDB.DB().RunCommand(v1ms.mgoDB.GetContext(),
bson.D{{"create", utils.OldSMCosts}, {"size", 1024}}).Err()
}
//get
func (v1ms *mongoStorDBMigrator) getV2SMCost() (v2Cost *v2SessionsCost, err error) {
if v1ms.cursor == nil {
var cursor mongo.Cursor
cursor, err = v1ms.mgoDB.DB().Collection(utils.SessionsCostsTBL).Find(v1ms.mgoDB.GetContext(), nil)
if err != nil {
return nil, err
}
v1ms.cursor = &cursor
}
if !(*v1ms.cursor).Next(v1ms.mgoDB.GetContext()) {
(*v1ms.cursor).Close(v1ms.mgoDB.GetContext())
v1ms.cursor = nil
return nil, utils.ErrNoMoreData
}
if err := (*v1ms.cursor).Decode(v2Cost); err != nil {
return nil, err
}
return v2Cost, nil
}
//set
func (v1ms *mongoStorDBMigrator) setV2SMCost(v2Cost *v2SessionsCost) (err error) {
_, err = v1ms.mgoDB.DB().Collection(utils.SessionsCostsTBL).InsertOne(v1ms.mgoDB.GetContext(), v2Cost)
return
}
//remove
func (v1ms *mongoStorDBMigrator) remV2SMCost(v2Cost *v2SessionsCost) (err error) {
_, err = v1ms.mgoDB.DB().Collection(v1AccountDBPrefix).DeleteMany(v1ms.mgoDB.GetContext(), nil)
return
}