mirror of
https://github.com/cgrates/cgrates.git
synced 2026-02-11 18:16:24 +05:00
Update mongo driver
This commit is contained in:
committed by
Dan Christian Bogos
parent
a49663371d
commit
d1481fb46c
@@ -41,7 +41,6 @@ import (
|
||||
"github.com/mongodb/mongo-go-driver/mongo"
|
||||
"github.com/mongodb/mongo-go-driver/mongo/options"
|
||||
"github.com/mongodb/mongo-go-driver/x/bsonx"
|
||||
"github.com/mongodb/mongo-go-driver/x/mongo/driver/topology"
|
||||
)
|
||||
|
||||
const (
|
||||
@@ -143,16 +142,7 @@ func NewMongoStorage(host, port, db, user, pass, storageType string, cdrsIndexes
|
||||
ctx := context.Background()
|
||||
url = "mongodb://" + url
|
||||
reg := bson.NewRegistryBuilder().RegisterDecoder(tTime, bsoncodec.ValueDecoderFunc(TimeDecodeValue1)).Build()
|
||||
opt := &options.ClientOptions{
|
||||
Registry: reg,
|
||||
TopologyOptions: []topology.Option{
|
||||
topology.WithServerOptions(func(opts ...topology.ServerOption) []topology.ServerOption {
|
||||
return []topology.ServerOption{
|
||||
topology.WithRegistry(func(r *bsoncodec.Registry) *bsoncodec.Registry { return reg }),
|
||||
}
|
||||
}),
|
||||
},
|
||||
}
|
||||
opt := options.Client().SetRegistry(reg)
|
||||
|
||||
client, err := mongo.NewClientWithOptions(url, opt)
|
||||
// client, err := mongo.NewClient(url)
|
||||
@@ -175,7 +165,7 @@ func NewMongoStorage(host, port, db, user, pass, storageType string, cdrsIndexes
|
||||
isDataDB: isDataDB,
|
||||
}
|
||||
if err = ms.client.UseSession(ms.ctx, func(sctx mongo.SessionContext) error {
|
||||
if col, err := ms.client.Database(dbName).ListCollections(sctx, nil, options.ListCollections().SetNameOnly(true)); err != nil {
|
||||
if col, err := ms.client.Database(dbName).ListCollections(sctx, bson.D{}, options.ListCollections().SetNameOnly(true)); err != nil {
|
||||
return err
|
||||
} else {
|
||||
empty := true
|
||||
@@ -225,14 +215,14 @@ func (ms *MongoStorage) IsDataDB() bool {
|
||||
func (ms *MongoStorage) EnusureIndex(colName string, uniq bool, keys ...string) error {
|
||||
return ms.client.UseSession(ms.ctx, func(sctx mongo.SessionContext) error {
|
||||
col := ms.getCol(colName)
|
||||
io := mongo.NewIndexOptionsBuilder().Unique(uniq)
|
||||
io := options.Index().SetUnique(uniq)
|
||||
var doc bsonx.Doc
|
||||
for _, k := range keys {
|
||||
doc = doc.Append(k, bsonx.Int32(1))
|
||||
}
|
||||
_, err := col.Indexes().CreateOne(sctx, mongo.IndexModel{
|
||||
Keys: doc,
|
||||
Options: io.Build(),
|
||||
Options: io,
|
||||
})
|
||||
return err
|
||||
})
|
||||
@@ -523,7 +513,7 @@ func (ms *MongoStorage) RemoveReverseForPrefix(prefix string) (err error) {
|
||||
// IsDBEmpty implementation
|
||||
func (ms *MongoStorage) IsDBEmpty() (resp bool, err error) {
|
||||
err = ms.client.UseSession(ms.ctx, func(sctx mongo.SessionContext) error {
|
||||
col, err := ms.DB().ListCollections(sctx, nil)
|
||||
col, err := ms.DB().ListCollections(sctx, bson.D{})
|
||||
if err != nil {
|
||||
return err
|
||||
}
|
||||
@@ -1151,7 +1141,7 @@ func (ms *MongoStorage) GetSubscribersDrv() (result map[string]*SubscriberData,
|
||||
Value *SubscriberData
|
||||
}
|
||||
if err = ms.client.UseSession(ms.ctx, func(sctx mongo.SessionContext) (err error) {
|
||||
cur, err := ms.getCol(colPbs).Find(sctx, nil)
|
||||
cur, err := ms.getCol(colPbs).Find(sctx, bson.D{})
|
||||
if err != nil {
|
||||
return err
|
||||
}
|
||||
@@ -1214,7 +1204,7 @@ func (ms *MongoStorage) GetUserDrv(key string) (up *UserProfile, err error) {
|
||||
|
||||
func (ms *MongoStorage) GetUsersDrv() (result []*UserProfile, err error) {
|
||||
err = ms.client.UseSession(ms.ctx, func(sctx mongo.SessionContext) (err error) {
|
||||
cur, err := ms.getCol(colUsr).Find(sctx, nil)
|
||||
cur, err := ms.getCol(colUsr).Find(sctx, bson.D{})
|
||||
if err != nil {
|
||||
return err
|
||||
}
|
||||
@@ -1802,7 +1792,7 @@ func (ms *MongoStorage) PopTask() (t *Task, err error) {
|
||||
Task *Task
|
||||
}{}
|
||||
if err = ms.client.UseSession(ms.ctx, func(sctx mongo.SessionContext) (err error) {
|
||||
cur := ms.getCol(colTsk).FindOneAndDelete(sctx, nil)
|
||||
cur := ms.getCol(colTsk).FindOneAndDelete(sctx, bson.D{})
|
||||
if err := cur.Decode(&v); err != nil {
|
||||
if err == mongo.ErrNoDocuments {
|
||||
return utils.ErrNotFound
|
||||
|
||||
@@ -36,7 +36,7 @@ import (
|
||||
func (ms *MongoStorage) GetTpIds(colName string) (tpids []string, err error) {
|
||||
getTpIDs := func(ctx context.Context, col string, tpMap map[string]struct{}) (map[string]struct{}, error) {
|
||||
if strings.HasPrefix(col, "tp_") {
|
||||
result, err := ms.getCol(col).Distinct(ctx, "tpid", nil)
|
||||
result, err := ms.getCol(col).Distinct(ctx, "tpid", bson.D{})
|
||||
if err != nil {
|
||||
return tpMap, err
|
||||
}
|
||||
@@ -50,7 +50,7 @@ func (ms *MongoStorage) GetTpIds(colName string) (tpids []string, err error) {
|
||||
|
||||
if colName == "" {
|
||||
if err := ms.client.UseSession(ms.ctx, func(sctx mongo.SessionContext) error {
|
||||
col, err := ms.DB().ListCollections(sctx, nil, options.ListCollections().SetNameOnly(true))
|
||||
col, err := ms.DB().ListCollections(sctx, bson.D{}, options.ListCollections().SetNameOnly(true))
|
||||
if err != nil {
|
||||
return err
|
||||
}
|
||||
@@ -671,7 +671,7 @@ func (ms *MongoStorage) GetTPAccountActions(tp *utils.TPAccountActions) ([]*util
|
||||
func (ms *MongoStorage) RemTpData(table, tpid string, args map[string]string) error {
|
||||
if len(table) == 0 { // Remove tpid out of all tables
|
||||
return ms.client.UseSession(ms.ctx, func(sctx mongo.SessionContext) error {
|
||||
col, err := ms.DB().ListCollections(sctx, nil, options.ListCollections().SetNameOnly(true))
|
||||
col, err := ms.DB().ListCollections(sctx, bson.D{}, options.ListCollections().SetNameOnly(true))
|
||||
if err != nil {
|
||||
return err
|
||||
}
|
||||
@@ -1616,7 +1616,7 @@ func (ms *MongoStorage) GetVersions(itm string) (vrs Versions, err error) {
|
||||
fop.SetProjection(bson.M{"_id": 0})
|
||||
}
|
||||
if err = ms.client.UseSession(ms.ctx, func(sctx mongo.SessionContext) (err error) {
|
||||
cur := ms.getCol(colVer).FindOne(sctx, nil, fop)
|
||||
cur := ms.getCol(colVer).FindOne(sctx, bson.D{}, fop)
|
||||
if err := cur.Decode(&vrs); err != nil {
|
||||
if err == mongo.ErrNoDocuments {
|
||||
return utils.ErrNotFound
|
||||
@@ -1638,7 +1638,7 @@ func (ms *MongoStorage) SetVersions(vrs Versions, overwrite bool) (err error) {
|
||||
ms.RemoveVersions(nil)
|
||||
}
|
||||
return ms.client.UseSession(ms.ctx, func(sctx mongo.SessionContext) (err error) {
|
||||
_, err = ms.getCol(colVer).UpdateOne(sctx, nil, bson.M{"$set": vrs},
|
||||
_, err = ms.getCol(colVer).UpdateOne(sctx, bson.D{}, bson.M{"$set": vrs},
|
||||
options.Update().SetUpsert(true),
|
||||
)
|
||||
return err
|
||||
@@ -1654,7 +1654,7 @@ func (ms *MongoStorage) SetVersions(vrs Versions, overwrite bool) (err error) {
|
||||
func (ms *MongoStorage) RemoveVersions(vrs Versions) (err error) {
|
||||
if len(vrs) == 0 {
|
||||
return ms.client.UseSession(ms.ctx, func(sctx mongo.SessionContext) (err error) {
|
||||
dr, err := ms.getCol(colVer).DeleteOne(sctx, nil)
|
||||
dr, err := ms.getCol(colVer).DeleteOne(sctx, bson.D{})
|
||||
if dr.DeletedCount == 0 {
|
||||
return utils.ErrNotFound
|
||||
}
|
||||
@@ -1663,7 +1663,7 @@ func (ms *MongoStorage) RemoveVersions(vrs Versions) (err error) {
|
||||
}
|
||||
return ms.client.UseSession(ms.ctx, func(sctx mongo.SessionContext) (err error) {
|
||||
for k := range vrs {
|
||||
if _, err = ms.getCol(colVer).UpdateOne(sctx, nil, bson.M{"$unset": bson.M{k: 1}},
|
||||
if _, err = ms.getCol(colVer).UpdateOne(sctx, bson.D{}, bson.M{"$unset": bson.M{k: 1}},
|
||||
options.Update().SetUpsert(true)); err != nil {
|
||||
return err
|
||||
}
|
||||
|
||||
2
glide.lock
generated
2
glide.lock
generated
@@ -97,7 +97,7 @@ imports:
|
||||
- name: github.com/fsnotify/fsnotify
|
||||
version: ccc981bf80385c528a65fbfdd49bf2d8da22aa23
|
||||
- name: github.com/mongodb/mongo-go-driver
|
||||
version: 30999752086b83f8191ea5e6eb526a8bd63508cc
|
||||
version: 4f3b0a882e7d5f83c3d1ab4d0e530a51642fce03
|
||||
subpackages:
|
||||
- bson
|
||||
- bson/bsoncodec
|
||||
|
||||
@@ -65,7 +65,7 @@ func (mgoMig *mongoMigrator) DataManager() *engine.DataManager {
|
||||
func (v1ms *mongoMigrator) getv1Account() (v1Acnt *v1Account, err error) {
|
||||
if v1ms.cursor == nil {
|
||||
var cursor mongo.Cursor
|
||||
cursor, err = v1ms.mgoDB.DB().Collection(v1AccountDBPrefix).Find(v1ms.mgoDB.GetContext(), nil)
|
||||
cursor, err = v1ms.mgoDB.DB().Collection(v1AccountDBPrefix).Find(v1ms.mgoDB.GetContext(), bson.D{})
|
||||
if err != nil {
|
||||
return nil, err
|
||||
}
|
||||
@@ -86,7 +86,9 @@ func (v1ms *mongoMigrator) getv1Account() (v1Acnt *v1Account, err error) {
|
||||
|
||||
//set
|
||||
func (v1ms *mongoMigrator) setV1Account(x *v1Account) (err error) {
|
||||
_, err = v1ms.mgoDB.DB().Collection(v1AccountDBPrefix).InsertOne(v1ms.mgoDB.GetContext(), x)
|
||||
if x != nil {
|
||||
_, err = v1ms.mgoDB.DB().Collection(v1AccountDBPrefix).InsertOne(v1ms.mgoDB.GetContext(), x)
|
||||
}
|
||||
return
|
||||
}
|
||||
|
||||
@@ -101,7 +103,7 @@ func (v1ms *mongoMigrator) remV1Account(id string) (err error) {
|
||||
func (v1ms *mongoMigrator) getv2Account() (v2Acnt *v2Account, err error) {
|
||||
if v1ms.cursor == nil {
|
||||
var cursor mongo.Cursor
|
||||
cursor, err = v1ms.mgoDB.DB().Collection(v2AccountsCol).Find(v1ms.mgoDB.GetContext(), nil)
|
||||
cursor, err = v1ms.mgoDB.DB().Collection(v2AccountsCol).Find(v1ms.mgoDB.GetContext(), bson.D{})
|
||||
if err != nil {
|
||||
return nil, err
|
||||
}
|
||||
@@ -121,7 +123,9 @@ func (v1ms *mongoMigrator) getv2Account() (v2Acnt *v2Account, err error) {
|
||||
|
||||
//set
|
||||
func (v1ms *mongoMigrator) setV2Account(x *v2Account) (err error) {
|
||||
_, err = v1ms.mgoDB.DB().Collection(v2AccountsCol).InsertOne(v1ms.mgoDB.GetContext(), x)
|
||||
if x != nil {
|
||||
_, err = v1ms.mgoDB.DB().Collection(v2AccountsCol).InsertOne(v1ms.mgoDB.GetContext(), x)
|
||||
}
|
||||
return
|
||||
}
|
||||
|
||||
@@ -137,7 +141,7 @@ func (v1ms *mongoMigrator) getV1ActionPlans() (v1aps *v1ActionPlans, err error)
|
||||
strct := new(AtKeyValue)
|
||||
if v1ms.cursor == nil {
|
||||
var cursor mongo.Cursor
|
||||
cursor, err = v1ms.mgoDB.DB().Collection("actiontimings").Find(v1ms.mgoDB.GetContext(), nil)
|
||||
cursor, err = v1ms.mgoDB.DB().Collection("actiontimings").Find(v1ms.mgoDB.GetContext(), bson.D{})
|
||||
if err != nil {
|
||||
return nil, err
|
||||
}
|
||||
@@ -167,7 +171,7 @@ func (v1ms *mongoMigrator) getV1Actions() (v1acs *v1Actions, err error) {
|
||||
strct := new(AcKeyValue)
|
||||
if v1ms.cursor == nil {
|
||||
var cursor mongo.Cursor
|
||||
cursor, err = v1ms.mgoDB.DB().Collection("actions").Find(v1ms.mgoDB.GetContext(), nil)
|
||||
cursor, err = v1ms.mgoDB.DB().Collection("actions").Find(v1ms.mgoDB.GetContext(), bson.D{})
|
||||
if err != nil {
|
||||
return nil, err
|
||||
}
|
||||
@@ -207,7 +211,7 @@ func (v1ms *mongoMigrator) setV1ActionTriggers(x *v1ActionTriggers) (err error)
|
||||
func (v1ms *mongoMigrator) getV1SharedGroup() (v1sg *v1SharedGroup, err error) {
|
||||
if v1ms.cursor == nil {
|
||||
var cursor mongo.Cursor
|
||||
cursor, err = v1ms.mgoDB.DB().Collection(utils.SHARED_GROUP_PREFIX).Find(v1ms.mgoDB.GetContext(), nil)
|
||||
cursor, err = v1ms.mgoDB.DB().Collection(utils.SHARED_GROUP_PREFIX).Find(v1ms.mgoDB.GetContext(), bson.D{})
|
||||
if err != nil {
|
||||
return nil, err
|
||||
}
|
||||
@@ -236,7 +240,7 @@ func (v1ms *mongoMigrator) setV1SharedGroup(x *v1SharedGroup) (err error) {
|
||||
func (v1ms *mongoMigrator) getV1Stats() (v1st *v1Stat, err error) {
|
||||
if v1ms.cursor == nil {
|
||||
var cursor mongo.Cursor
|
||||
cursor, err = v1ms.mgoDB.DB().Collection(utils.CDR_STATS_PREFIX).Find(v1ms.mgoDB.GetContext(), nil)
|
||||
cursor, err = v1ms.mgoDB.DB().Collection(utils.CDR_STATS_PREFIX).Find(v1ms.mgoDB.GetContext(), bson.D{})
|
||||
if err != nil {
|
||||
return nil, err
|
||||
}
|
||||
@@ -265,7 +269,7 @@ func (v1ms *mongoMigrator) setV1Stats(x *v1Stat) (err error) {
|
||||
func (v1ms *mongoMigrator) getV2ActionTrigger() (v2at *v2ActionTrigger, err error) {
|
||||
if v1ms.cursor == nil {
|
||||
var cursor mongo.Cursor
|
||||
cursor, err = v1ms.mgoDB.DB().Collection(v1ActionTriggersCol).Find(v1ms.mgoDB.GetContext(), nil)
|
||||
cursor, err = v1ms.mgoDB.DB().Collection(v1ActionTriggersCol).Find(v1ms.mgoDB.GetContext(), bson.D{})
|
||||
if err != nil {
|
||||
return nil, err
|
||||
}
|
||||
@@ -294,7 +298,7 @@ func (v1ms *mongoMigrator) setV2ActionTrigger(x *v2ActionTrigger) (err error) {
|
||||
func (v1ms *mongoMigrator) getV1AttributeProfile() (v1attrPrf *v1AttributeProfile, err error) {
|
||||
if v1ms.cursor == nil {
|
||||
var cursor mongo.Cursor
|
||||
cursor, err = v1ms.mgoDB.DB().Collection(v1AttributeProfilesCol).Find(v1ms.mgoDB.GetContext(), nil)
|
||||
cursor, err = v1ms.mgoDB.DB().Collection(v1AttributeProfilesCol).Find(v1ms.mgoDB.GetContext(), bson.D{})
|
||||
if err != nil {
|
||||
return nil, err
|
||||
}
|
||||
@@ -323,7 +327,7 @@ func (v1ms *mongoMigrator) setV1AttributeProfile(x *v1AttributeProfile) (err err
|
||||
func (v1ms *mongoMigrator) getV2ThresholdProfile() (v2T *v2Threshold, err error) {
|
||||
if v1ms.cursor == nil {
|
||||
var cursor mongo.Cursor
|
||||
cursor, err = v1ms.mgoDB.DB().Collection(v2ThresholdProfileCol).Find(v1ms.mgoDB.GetContext(), nil)
|
||||
cursor, err = v1ms.mgoDB.DB().Collection(v2ThresholdProfileCol).Find(v1ms.mgoDB.GetContext(), bson.D{})
|
||||
if err != nil {
|
||||
return nil, err
|
||||
}
|
||||
|
||||
@@ -48,7 +48,7 @@ func (mgoMig *mongoStorDBMigrator) StorDB() engine.StorDB {
|
||||
func (v1ms *mongoStorDBMigrator) getV1CDR() (v1Cdr *v1Cdrs, err error) {
|
||||
if v1ms.cursor == nil {
|
||||
var cursor mongo.Cursor
|
||||
cursor, err = v1ms.mgoDB.DB().Collection(engine.ColCDRs).Find(v1ms.mgoDB.GetContext(), nil)
|
||||
cursor, err = v1ms.mgoDB.DB().Collection(engine.ColCDRs).Find(v1ms.mgoDB.GetContext(), bson.D{})
|
||||
if err != nil {
|
||||
return nil, err
|
||||
}
|
||||
@@ -93,7 +93,7 @@ func (v1ms *mongoStorDBMigrator) createV1SMCosts() (err error) {
|
||||
func (v1ms *mongoStorDBMigrator) getV2SMCost() (v2Cost *v2SessionsCost, err error) {
|
||||
if v1ms.cursor == nil {
|
||||
var cursor mongo.Cursor
|
||||
cursor, err = v1ms.mgoDB.DB().Collection(utils.SessionsCostsTBL).Find(v1ms.mgoDB.GetContext(), nil)
|
||||
cursor, err = v1ms.mgoDB.DB().Collection(utils.SessionsCostsTBL).Find(v1ms.mgoDB.GetContext(), bson.D{})
|
||||
if err != nil {
|
||||
return nil, err
|
||||
}
|
||||
@@ -119,6 +119,6 @@ func (v1ms *mongoStorDBMigrator) setV2SMCost(v2Cost *v2SessionsCost) (err error)
|
||||
|
||||
//remove
|
||||
func (v1ms *mongoStorDBMigrator) remV2SMCost(v2Cost *v2SessionsCost) (err error) {
|
||||
_, err = v1ms.mgoDB.DB().Collection(utils.SessionsCostsTBL).DeleteMany(v1ms.mgoDB.GetContext(), nil)
|
||||
_, err = v1ms.mgoDB.DB().Collection(utils.SessionsCostsTBL).DeleteMany(v1ms.mgoDB.GetContext(), bson.D{})
|
||||
return
|
||||
}
|
||||
|
||||
Reference in New Issue
Block a user