mirror of
https://github.com/cgrates/cgrates.git
synced 2026-02-24 16:48:45 +05:00
Upgrade MongoDB driver to 1.13
- Set (but comment) serverAPI options (currently distinct api and create.size BSON field are deprecated + possible others that are untested) - Remove the custom time decoder used for mongo BSON datetime values. The custom decoder was only converting these values into UTC and was not any different from the default time.Time decoder in the MongoDB driver, which also handles BSON string, int64, and document values. - Implement 'buildURL' function to connect to mongo (can also be used for mysql and postgres) - Update function names, variable names, and comments for clarity - Replace 'bsonx.Regex' with the Regex primitive (deprecated since 1.12). - Use simple concatenation instead of Sprintf - Declare 'decimalType' locally, replace global 'decimalType' - Simplify several functions without altering functionality - Converting directly from a D to an M is deprecated. We are now decoding directly in a M. - Used errors.As and errors.Is for proper error comparison and assertion - Revised sloppy reassignments and added missing error checks
This commit is contained in:
committed by
Dan Christian Bogos
parent
e5ab21def5
commit
26cdb571b8
@@ -19,8 +19,14 @@ along with this program. If not, see <http://www.gnu.org/licenses/>
|
||||
package engine
|
||||
|
||||
import (
|
||||
"errors"
|
||||
"reflect"
|
||||
|
||||
"github.com/cgrates/birpc/context"
|
||||
"github.com/cgrates/cgrates/utils"
|
||||
"go.mongodb.org/mongo-driver/bson"
|
||||
"go.mongodb.org/mongo-driver/bson/bsoncodec"
|
||||
"go.mongodb.org/mongo-driver/bson/bsonrw"
|
||||
"go.mongodb.org/mongo-driver/mongo"
|
||||
"go.mongodb.org/mongo-driver/mongo/options"
|
||||
)
|
||||
@@ -30,17 +36,30 @@ const (
|
||||
)
|
||||
|
||||
func (ms *MongoStorage) GetSection(ctx *context.Context, section string, val any) error {
|
||||
return ms.query(context.TODO(), func(sctx mongo.SessionContext) (err error) {
|
||||
cur := ms.getCol(ColCfg).FindOne(sctx, bson.M{"section": section},
|
||||
return ms.query(context.TODO(), func(sctx mongo.SessionContext) error {
|
||||
sr := ms.getCol(ColCfg).FindOne(sctx, bson.M{"section": section},
|
||||
options.FindOne().SetProjection(bson.M{"cfg": 1, "_id": 0 /*"section": 0, */}))
|
||||
tmp := map[string]bson.Raw{}
|
||||
if err = cur.Decode(&tmp); err != nil {
|
||||
if err == mongo.ErrNoDocuments {
|
||||
decodeErr := sr.Decode(&tmp)
|
||||
if decodeErr != nil {
|
||||
if errors.Is(decodeErr, mongo.ErrNoDocuments) {
|
||||
return nil
|
||||
}
|
||||
return
|
||||
return decodeErr
|
||||
}
|
||||
return bson.UnmarshalWithRegistry(mongoReg, tmp["cfg"], val)
|
||||
reg := bson.NewRegistry()
|
||||
decimalType := reflect.TypeOf(utils.Decimal{})
|
||||
reg.RegisterTypeEncoder(decimalType, bsoncodec.ValueEncoderFunc(decimalEncoder))
|
||||
reg.RegisterTypeDecoder(decimalType, bsoncodec.ValueDecoderFunc(decimalDecoder))
|
||||
|
||||
dec, err := bson.NewDecoder(bsonrw.NewBSONDocumentReader(tmp["cfg"]))
|
||||
if err != nil {
|
||||
return err
|
||||
}
|
||||
if err = dec.SetRegistry(reg); err != nil {
|
||||
return err
|
||||
}
|
||||
return dec.Decode(val)
|
||||
})
|
||||
}
|
||||
|
||||
|
||||
File diff suppressed because it is too large
Load Diff
@@ -20,7 +20,6 @@ package engine
|
||||
|
||||
import (
|
||||
"fmt"
|
||||
"regexp"
|
||||
"strings"
|
||||
"time"
|
||||
|
||||
@@ -31,765 +30,8 @@ import (
|
||||
"go.mongodb.org/mongo-driver/bson/primitive"
|
||||
"go.mongodb.org/mongo-driver/mongo"
|
||||
"go.mongodb.org/mongo-driver/mongo/options"
|
||||
"go.mongodb.org/mongo-driver/x/bsonx"
|
||||
)
|
||||
|
||||
func (ms *MongoStorage) GetTpIds(colName string) (tpids []string, err error) {
|
||||
getTpIDs := func(ctx mongo.SessionContext, col string, tpMap utils.StringSet) (utils.StringSet, error) {
|
||||
if strings.HasPrefix(col, "tp_") {
|
||||
result, err := ms.getCol(col).Distinct(ctx, "tpid", bson.D{})
|
||||
if err != nil {
|
||||
return tpMap, err
|
||||
}
|
||||
for _, tpid := range result {
|
||||
tpMap.Add(tpid.(string))
|
||||
}
|
||||
}
|
||||
return tpMap, nil
|
||||
}
|
||||
tpidMap := make(utils.StringSet)
|
||||
|
||||
if colName == "" {
|
||||
if err := ms.query(context.TODO(), func(sctx mongo.SessionContext) error {
|
||||
col, err := ms.DB().ListCollections(sctx, bson.D{}, options.ListCollections().SetNameOnly(true))
|
||||
if err != nil {
|
||||
return err
|
||||
}
|
||||
for col.Next(sctx) {
|
||||
var elem struct{ Name string }
|
||||
if err := col.Decode(&elem); err != nil {
|
||||
return err
|
||||
}
|
||||
if tpidMap, err = getTpIDs(sctx, elem.Name, tpidMap); err != nil {
|
||||
return err
|
||||
}
|
||||
}
|
||||
return col.Close(sctx)
|
||||
}); err != nil {
|
||||
return nil, err
|
||||
}
|
||||
} else {
|
||||
if err := ms.query(context.TODO(), func(sctx mongo.SessionContext) error {
|
||||
tpidMap, err = getTpIDs(sctx, colName, tpidMap)
|
||||
return err
|
||||
}); err != nil {
|
||||
return nil, err
|
||||
}
|
||||
}
|
||||
tpids = tpidMap.AsSlice()
|
||||
return tpids, nil
|
||||
}
|
||||
|
||||
func (ms *MongoStorage) GetTpTableIds(tpid, table string, distinct []string,
|
||||
filter map[string]string, pag *utils.PaginatorWithSearch) ([]string, error) {
|
||||
findMap := bson.M{}
|
||||
if tpid != "" {
|
||||
findMap["tpid"] = tpid
|
||||
}
|
||||
for k, v := range filter {
|
||||
if k != "" && v != "" {
|
||||
findMap[k] = v
|
||||
}
|
||||
}
|
||||
|
||||
fop := options.Find()
|
||||
if pag != nil {
|
||||
if pag.Search != "" {
|
||||
var searchItems []bson.M
|
||||
for _, d := range distinct {
|
||||
searchItems = append(searchItems, bson.M{d: bsonx.Regex(".*"+regexp.QuoteMeta(pag.Search)+".*", "")})
|
||||
}
|
||||
// findMap["$and"] = []bson.M{{"$or": searchItems}} //before
|
||||
findMap["$or"] = searchItems // after
|
||||
}
|
||||
if pag.Paginator != nil {
|
||||
if pag.Limit != nil {
|
||||
fop = fop.SetLimit(int64(*pag.Limit))
|
||||
}
|
||||
if pag.Offset != nil {
|
||||
fop = fop.SetSkip(int64(*pag.Offset))
|
||||
}
|
||||
}
|
||||
}
|
||||
|
||||
selectors := bson.M{"_id": 0}
|
||||
for i, d := range distinct {
|
||||
if d == "tag" { // convert the tag used in SQL into id used here
|
||||
distinct[i] = "id"
|
||||
}
|
||||
selectors[distinct[i]] = 1
|
||||
}
|
||||
fop.SetProjection(selectors)
|
||||
|
||||
distinctIds := make(utils.StringSet)
|
||||
if err := ms.query(context.TODO(), func(sctx mongo.SessionContext) (err error) {
|
||||
cur, err := ms.getCol(table).Find(sctx, findMap, fop)
|
||||
if err != nil {
|
||||
return err
|
||||
}
|
||||
for cur.Next(sctx) {
|
||||
var elem bson.D
|
||||
err := cur.Decode(&elem)
|
||||
if err != nil {
|
||||
return err
|
||||
}
|
||||
item := elem.Map()
|
||||
|
||||
var id string
|
||||
last := len(distinct) - 1
|
||||
for i, d := range distinct {
|
||||
if distinctValue, ok := item[d]; ok {
|
||||
id += distinctValue.(string)
|
||||
}
|
||||
if i < last {
|
||||
id += utils.ConcatenatedKeySep
|
||||
}
|
||||
}
|
||||
distinctIds.Add(id)
|
||||
}
|
||||
return cur.Close(sctx)
|
||||
}); err != nil {
|
||||
return nil, err
|
||||
}
|
||||
return distinctIds.AsSlice(), nil
|
||||
}
|
||||
|
||||
func (ms *MongoStorage) GetTPResources(tpid, tenant, id string) ([]*utils.TPResourceProfile, error) {
|
||||
filter := bson.M{"tpid": tpid}
|
||||
if id != "" {
|
||||
filter["id"] = id
|
||||
}
|
||||
if tenant != "" {
|
||||
filter["tenant"] = tenant
|
||||
}
|
||||
var results []*utils.TPResourceProfile
|
||||
err := ms.query(context.TODO(), func(sctx mongo.SessionContext) (err error) {
|
||||
cur, err := ms.getCol(utils.TBLTPResources).Find(sctx, filter)
|
||||
if err != nil {
|
||||
return err
|
||||
}
|
||||
for cur.Next(sctx) {
|
||||
var el utils.TPResourceProfile
|
||||
err := cur.Decode(&el)
|
||||
if err != nil {
|
||||
return err
|
||||
}
|
||||
results = append(results, &el)
|
||||
}
|
||||
if len(results) == 0 {
|
||||
return utils.ErrNotFound
|
||||
}
|
||||
return cur.Close(sctx)
|
||||
})
|
||||
return results, err
|
||||
}
|
||||
|
||||
func (ms *MongoStorage) GetTPStats(tpid, tenant, id string) ([]*utils.TPStatProfile, error) {
|
||||
filter := bson.M{
|
||||
"tpid": tpid,
|
||||
}
|
||||
if id != "" {
|
||||
filter["id"] = id
|
||||
}
|
||||
if tenant != "" {
|
||||
filter["tenant"] = tenant
|
||||
}
|
||||
var results []*utils.TPStatProfile
|
||||
err := ms.query(context.TODO(), func(sctx mongo.SessionContext) (err error) {
|
||||
cur, err := ms.getCol(utils.TBLTPStats).Find(sctx, filter)
|
||||
if err != nil {
|
||||
return err
|
||||
}
|
||||
for cur.Next(sctx) {
|
||||
var el utils.TPStatProfile
|
||||
err := cur.Decode(&el)
|
||||
if err != nil {
|
||||
return err
|
||||
}
|
||||
results = append(results, &el)
|
||||
}
|
||||
if len(results) == 0 {
|
||||
return utils.ErrNotFound
|
||||
}
|
||||
return cur.Close(sctx)
|
||||
})
|
||||
return results, err
|
||||
}
|
||||
|
||||
func (ms *MongoStorage) RemTpData(table, tpid string, args map[string]string) error {
|
||||
if len(table) == 0 { // Remove tpid out of all tables
|
||||
return ms.query(context.TODO(), func(sctx mongo.SessionContext) error {
|
||||
col, err := ms.DB().ListCollections(sctx, bson.D{}, options.ListCollections().SetNameOnly(true))
|
||||
if err != nil {
|
||||
return err
|
||||
}
|
||||
for col.Next(sctx) {
|
||||
var elem struct{ Name string }
|
||||
if err := col.Decode(&elem); err != nil {
|
||||
return err
|
||||
}
|
||||
if strings.HasPrefix(elem.Name, "tp_") {
|
||||
_, err = ms.getCol(elem.Name).DeleteMany(sctx, bson.M{"tpid": tpid})
|
||||
if err != nil {
|
||||
return err
|
||||
}
|
||||
}
|
||||
}
|
||||
return col.Close(sctx)
|
||||
})
|
||||
}
|
||||
// Remove from a single table
|
||||
if args == nil {
|
||||
args = make(map[string]string)
|
||||
}
|
||||
|
||||
if _, has := args["tag"]; has { // API uses tag to be compatible with SQL models, fix it here
|
||||
args["id"] = args["tag"]
|
||||
delete(args, "tag")
|
||||
}
|
||||
if tpid != "" {
|
||||
args["tpid"] = tpid
|
||||
}
|
||||
return ms.query(context.TODO(), func(sctx mongo.SessionContext) (err error) {
|
||||
dr, err := ms.getCol(table).DeleteOne(sctx, args)
|
||||
if dr.DeletedCount == 0 {
|
||||
return utils.ErrNotFound
|
||||
}
|
||||
return err
|
||||
})
|
||||
}
|
||||
|
||||
func (ms *MongoStorage) SetTPResources(tpRLs []*utils.TPResourceProfile) (err error) {
|
||||
if len(tpRLs) == 0 {
|
||||
return
|
||||
}
|
||||
return ms.query(context.TODO(), func(sctx mongo.SessionContext) (err error) {
|
||||
for _, tp := range tpRLs {
|
||||
_, err = ms.getCol(utils.TBLTPResources).UpdateOne(sctx, bson.M{"tpid": tp.TPid, "id": tp.ID},
|
||||
bson.M{"$set": tp}, options.Update().SetUpsert(true))
|
||||
if err != nil {
|
||||
return err
|
||||
}
|
||||
}
|
||||
return nil
|
||||
})
|
||||
}
|
||||
|
||||
func (ms *MongoStorage) SetTPRStats(tps []*utils.TPStatProfile) (err error) {
|
||||
if len(tps) == 0 {
|
||||
return
|
||||
}
|
||||
return ms.query(context.TODO(), func(sctx mongo.SessionContext) (err error) {
|
||||
for _, tp := range tps {
|
||||
_, err = ms.getCol(utils.TBLTPStats).UpdateOne(sctx, bson.M{"tpid": tp.TPid, "id": tp.ID},
|
||||
bson.M{"$set": tp}, options.Update().SetUpsert(true))
|
||||
if err != nil {
|
||||
return err
|
||||
}
|
||||
}
|
||||
return nil
|
||||
})
|
||||
}
|
||||
|
||||
func (ms *MongoStorage) SetTPStats(tpSTs []*utils.TPStatProfile) (err error) {
|
||||
if len(tpSTs) == 0 {
|
||||
return
|
||||
}
|
||||
return ms.query(context.TODO(), func(sctx mongo.SessionContext) (err error) {
|
||||
for _, tp := range tpSTs {
|
||||
_, err = ms.getCol(utils.TBLTPStats).UpdateOne(sctx, bson.M{"tpid": tp.TPid, "id": tp.ID},
|
||||
bson.M{"$set": tp},
|
||||
options.Update().SetUpsert(true),
|
||||
)
|
||||
if err != nil {
|
||||
return err
|
||||
}
|
||||
}
|
||||
return nil
|
||||
})
|
||||
}
|
||||
|
||||
func (ms *MongoStorage) GetTPThresholds(tpid, tenant, id string) ([]*utils.TPThresholdProfile, error) {
|
||||
filter := bson.M{"tpid": tpid}
|
||||
if id != "" {
|
||||
filter["id"] = id
|
||||
}
|
||||
if tenant != "" {
|
||||
filter["tenant"] = tenant
|
||||
}
|
||||
var results []*utils.TPThresholdProfile
|
||||
err := ms.query(context.TODO(), func(sctx mongo.SessionContext) (err error) {
|
||||
cur, err := ms.getCol(utils.TBLTPThresholds).Find(sctx, filter)
|
||||
if err != nil {
|
||||
return err
|
||||
}
|
||||
for cur.Next(sctx) {
|
||||
var tp utils.TPThresholdProfile
|
||||
err := cur.Decode(&tp)
|
||||
if err != nil {
|
||||
return err
|
||||
}
|
||||
results = append(results, &tp)
|
||||
}
|
||||
if len(results) == 0 {
|
||||
return utils.ErrNotFound
|
||||
}
|
||||
return cur.Close(sctx)
|
||||
})
|
||||
return results, err
|
||||
}
|
||||
|
||||
func (ms *MongoStorage) SetTPThresholds(tpTHs []*utils.TPThresholdProfile) (err error) {
|
||||
if len(tpTHs) == 0 {
|
||||
return
|
||||
}
|
||||
return ms.query(context.TODO(), func(sctx mongo.SessionContext) (err error) {
|
||||
for _, tp := range tpTHs {
|
||||
_, err = ms.getCol(utils.TBLTPThresholds).UpdateOne(sctx, bson.M{"tpid": tp.TPid, "id": tp.ID},
|
||||
bson.M{"$set": tp},
|
||||
options.Update().SetUpsert(true),
|
||||
)
|
||||
if err != nil {
|
||||
return err
|
||||
}
|
||||
}
|
||||
return nil
|
||||
})
|
||||
}
|
||||
|
||||
func (ms *MongoStorage) GetTPFilters(tpid, tenant, id string) ([]*utils.TPFilterProfile, error) {
|
||||
filter := bson.M{"tpid": tpid}
|
||||
if id != "" {
|
||||
filter["id"] = id
|
||||
}
|
||||
if tenant != "" {
|
||||
filter["tenant"] = tenant
|
||||
}
|
||||
results := []*utils.TPFilterProfile{}
|
||||
err := ms.query(context.TODO(), func(sctx mongo.SessionContext) (err error) {
|
||||
cur, err := ms.getCol(utils.TBLTPFilters).Find(sctx, filter)
|
||||
if err != nil {
|
||||
return err
|
||||
}
|
||||
for cur.Next(sctx) {
|
||||
var tp utils.TPFilterProfile
|
||||
err := cur.Decode(&tp)
|
||||
if err != nil {
|
||||
return err
|
||||
}
|
||||
results = append(results, &tp)
|
||||
}
|
||||
if len(results) == 0 {
|
||||
return utils.ErrNotFound
|
||||
}
|
||||
return cur.Close(sctx)
|
||||
})
|
||||
return results, err
|
||||
}
|
||||
|
||||
func (ms *MongoStorage) SetTPFilters(tpTHs []*utils.TPFilterProfile) (err error) {
|
||||
if len(tpTHs) == 0 {
|
||||
return
|
||||
}
|
||||
return ms.query(context.TODO(), func(sctx mongo.SessionContext) (err error) {
|
||||
for _, tp := range tpTHs {
|
||||
_, err = ms.getCol(utils.TBLTPFilters).UpdateOne(sctx, bson.M{"tpid": tp.TPid, "id": tp.ID},
|
||||
bson.M{"$set": tp},
|
||||
options.Update().SetUpsert(true),
|
||||
)
|
||||
if err != nil {
|
||||
return err
|
||||
}
|
||||
}
|
||||
return nil
|
||||
})
|
||||
}
|
||||
|
||||
func (ms *MongoStorage) GetTPRoutes(tpid, tenant, id string) ([]*utils.TPRouteProfile, error) {
|
||||
filter := bson.M{"tpid": tpid}
|
||||
if id != "" {
|
||||
filter["id"] = id
|
||||
}
|
||||
if tenant != "" {
|
||||
filter["tenant"] = tenant
|
||||
}
|
||||
var results []*utils.TPRouteProfile
|
||||
err := ms.query(context.TODO(), func(sctx mongo.SessionContext) (err error) {
|
||||
cur, err := ms.getCol(utils.TBLTPRoutes).Find(sctx, filter)
|
||||
if err != nil {
|
||||
return err
|
||||
}
|
||||
for cur.Next(sctx) {
|
||||
var tp utils.TPRouteProfile
|
||||
err := cur.Decode(&tp)
|
||||
if err != nil {
|
||||
return err
|
||||
}
|
||||
results = append(results, &tp)
|
||||
}
|
||||
if len(results) == 0 {
|
||||
return utils.ErrNotFound
|
||||
}
|
||||
return cur.Close(sctx)
|
||||
})
|
||||
return results, err
|
||||
}
|
||||
|
||||
func (ms *MongoStorage) SetTPRoutes(tpRoutes []*utils.TPRouteProfile) (err error) {
|
||||
if len(tpRoutes) == 0 {
|
||||
return
|
||||
}
|
||||
return ms.query(context.TODO(), func(sctx mongo.SessionContext) (err error) {
|
||||
for _, tp := range tpRoutes {
|
||||
_, err = ms.getCol(utils.TBLTPRoutes).UpdateOne(sctx, bson.M{"tpid": tp.TPid, "id": tp.ID},
|
||||
bson.M{"$set": tp},
|
||||
options.Update().SetUpsert(true),
|
||||
)
|
||||
if err != nil {
|
||||
return err
|
||||
}
|
||||
}
|
||||
return nil
|
||||
})
|
||||
}
|
||||
|
||||
func (ms *MongoStorage) GetTPAttributes(tpid, tenant, id string) ([]*utils.TPAttributeProfile, error) {
|
||||
filter := bson.M{"tpid": tpid}
|
||||
if id != "" {
|
||||
filter["id"] = id
|
||||
}
|
||||
if tenant != "" {
|
||||
filter["tenant"] = tenant
|
||||
}
|
||||
var results []*utils.TPAttributeProfile
|
||||
err := ms.query(context.TODO(), func(sctx mongo.SessionContext) (err error) {
|
||||
cur, err := ms.getCol(utils.TBLTPAttributes).Find(sctx, filter)
|
||||
if err != nil {
|
||||
return err
|
||||
}
|
||||
for cur.Next(sctx) {
|
||||
var tp utils.TPAttributeProfile
|
||||
err := cur.Decode(&tp)
|
||||
if err != nil {
|
||||
return err
|
||||
}
|
||||
results = append(results, &tp)
|
||||
}
|
||||
if len(results) == 0 {
|
||||
return utils.ErrNotFound
|
||||
}
|
||||
return cur.Close(sctx)
|
||||
})
|
||||
return results, err
|
||||
}
|
||||
|
||||
func (ms *MongoStorage) SetTPAttributes(tpRoutes []*utils.TPAttributeProfile) (err error) {
|
||||
if len(tpRoutes) == 0 {
|
||||
return
|
||||
}
|
||||
return ms.query(context.TODO(), func(sctx mongo.SessionContext) (err error) {
|
||||
for _, tp := range tpRoutes {
|
||||
_, err = ms.getCol(utils.TBLTPAttributes).UpdateOne(sctx, bson.M{"tpid": tp.TPid, "id": tp.ID},
|
||||
bson.M{"$set": tp},
|
||||
options.Update().SetUpsert(true),
|
||||
)
|
||||
if err != nil {
|
||||
return err
|
||||
}
|
||||
}
|
||||
return nil
|
||||
})
|
||||
}
|
||||
|
||||
func (ms *MongoStorage) GetTPChargers(tpid, tenant, id string) ([]*utils.TPChargerProfile, error) {
|
||||
filter := bson.M{"tpid": tpid}
|
||||
if id != "" {
|
||||
filter["id"] = id
|
||||
}
|
||||
if tenant != "" {
|
||||
filter["tenant"] = tenant
|
||||
}
|
||||
var results []*utils.TPChargerProfile
|
||||
err := ms.query(context.TODO(), func(sctx mongo.SessionContext) (err error) {
|
||||
cur, err := ms.getCol(utils.TBLTPChargers).Find(sctx, filter)
|
||||
if err != nil {
|
||||
return err
|
||||
}
|
||||
for cur.Next(sctx) {
|
||||
var tp utils.TPChargerProfile
|
||||
err := cur.Decode(&tp)
|
||||
if err != nil {
|
||||
return err
|
||||
}
|
||||
results = append(results, &tp)
|
||||
}
|
||||
if len(results) == 0 {
|
||||
return utils.ErrNotFound
|
||||
}
|
||||
return cur.Close(sctx)
|
||||
})
|
||||
return results, err
|
||||
}
|
||||
|
||||
func (ms *MongoStorage) SetTPChargers(tpCPP []*utils.TPChargerProfile) (err error) {
|
||||
if len(tpCPP) == 0 {
|
||||
return
|
||||
}
|
||||
return ms.query(context.TODO(), func(sctx mongo.SessionContext) (err error) {
|
||||
for _, tp := range tpCPP {
|
||||
_, err = ms.getCol(utils.TBLTPChargers).UpdateOne(sctx, bson.M{"tpid": tp.TPid, "id": tp.ID},
|
||||
bson.M{"$set": tp},
|
||||
options.Update().SetUpsert(true),
|
||||
)
|
||||
if err != nil {
|
||||
return err
|
||||
}
|
||||
}
|
||||
return nil
|
||||
})
|
||||
}
|
||||
|
||||
func (ms *MongoStorage) GetTPDispatcherProfiles(tpid, tenant, id string) ([]*utils.TPDispatcherProfile, error) {
|
||||
filter := bson.M{"tpid": tpid}
|
||||
if id != "" {
|
||||
filter["id"] = id
|
||||
}
|
||||
if tenant != "" {
|
||||
filter["tenant"] = tenant
|
||||
}
|
||||
var results []*utils.TPDispatcherProfile
|
||||
err := ms.query(context.TODO(), func(sctx mongo.SessionContext) (err error) {
|
||||
cur, err := ms.getCol(utils.TBLTPDispatchers).Find(sctx, filter)
|
||||
if err != nil {
|
||||
return err
|
||||
}
|
||||
for cur.Next(sctx) {
|
||||
var tp utils.TPDispatcherProfile
|
||||
err := cur.Decode(&tp)
|
||||
if err != nil {
|
||||
return err
|
||||
}
|
||||
results = append(results, &tp)
|
||||
}
|
||||
if len(results) == 0 {
|
||||
return utils.ErrNotFound
|
||||
}
|
||||
return cur.Close(sctx)
|
||||
})
|
||||
return results, err
|
||||
}
|
||||
|
||||
func (ms *MongoStorage) SetTPDispatcherProfiles(tpDPPs []*utils.TPDispatcherProfile) (err error) {
|
||||
if len(tpDPPs) == 0 {
|
||||
return
|
||||
}
|
||||
return ms.query(context.TODO(), func(sctx mongo.SessionContext) (err error) {
|
||||
for _, tp := range tpDPPs {
|
||||
_, err = ms.getCol(utils.TBLTPDispatchers).UpdateOne(sctx, bson.M{"tpid": tp.TPid, "id": tp.ID},
|
||||
bson.M{"$set": tp},
|
||||
options.Update().SetUpsert(true),
|
||||
)
|
||||
if err != nil {
|
||||
return err
|
||||
}
|
||||
}
|
||||
return nil
|
||||
})
|
||||
}
|
||||
|
||||
func (ms *MongoStorage) GetTPDispatcherHosts(tpid, tenant, id string) ([]*utils.TPDispatcherHost, error) {
|
||||
filter := bson.M{"tpid": tpid}
|
||||
if id != "" {
|
||||
filter["id"] = id
|
||||
}
|
||||
if tenant != "" {
|
||||
filter["tenant"] = tenant
|
||||
}
|
||||
var results []*utils.TPDispatcherHost
|
||||
err := ms.query(context.TODO(), func(sctx mongo.SessionContext) (err error) {
|
||||
cur, err := ms.getCol(utils.TBLTPDispatcherHosts).Find(sctx, filter)
|
||||
if err != nil {
|
||||
return err
|
||||
}
|
||||
for cur.Next(sctx) {
|
||||
var tp utils.TPDispatcherHost
|
||||
err := cur.Decode(&tp)
|
||||
if err != nil {
|
||||
return err
|
||||
}
|
||||
results = append(results, &tp)
|
||||
}
|
||||
if len(results) == 0 {
|
||||
return utils.ErrNotFound
|
||||
}
|
||||
return cur.Close(sctx)
|
||||
})
|
||||
return results, err
|
||||
}
|
||||
|
||||
func (ms *MongoStorage) SetTPDispatcherHosts(tpDPPs []*utils.TPDispatcherHost) (err error) {
|
||||
if len(tpDPPs) == 0 {
|
||||
return
|
||||
}
|
||||
return ms.query(context.TODO(), func(sctx mongo.SessionContext) (err error) {
|
||||
for _, tp := range tpDPPs {
|
||||
_, err = ms.getCol(utils.TBLTPDispatcherHosts).UpdateOne(sctx, bson.M{"tpid": tp.TPid, "id": tp.ID},
|
||||
bson.M{"$set": tp},
|
||||
options.Update().SetUpsert(true),
|
||||
)
|
||||
if err != nil {
|
||||
return err
|
||||
}
|
||||
}
|
||||
return nil
|
||||
})
|
||||
}
|
||||
|
||||
func (ms *MongoStorage) GetTPRateProfiles(tpid, tenant, id string) ([]*utils.TPRateProfile, error) {
|
||||
filter := bson.M{"tpid": tpid}
|
||||
if id != "" {
|
||||
filter["id"] = id
|
||||
}
|
||||
if tenant != "" {
|
||||
filter["tenant"] = tenant
|
||||
}
|
||||
var results []*utils.TPRateProfile
|
||||
err := ms.query(context.TODO(), func(sctx mongo.SessionContext) (err error) {
|
||||
cur, err := ms.getCol(utils.TBLTPRateProfiles).Find(sctx, filter)
|
||||
if err != nil {
|
||||
return err
|
||||
}
|
||||
for cur.Next(sctx) {
|
||||
var tp utils.TPRateProfile
|
||||
err := cur.Decode(&tp)
|
||||
if err != nil {
|
||||
return err
|
||||
}
|
||||
results = append(results, &tp)
|
||||
}
|
||||
if len(results) == 0 {
|
||||
return utils.ErrNotFound
|
||||
}
|
||||
return cur.Close(sctx)
|
||||
})
|
||||
return results, err
|
||||
}
|
||||
|
||||
func (ms *MongoStorage) SetTPRateProfiles(tpDPPs []*utils.TPRateProfile) (err error) {
|
||||
if len(tpDPPs) == 0 {
|
||||
return
|
||||
}
|
||||
|
||||
return ms.query(context.TODO(), func(sctx mongo.SessionContext) (err error) {
|
||||
for _, tp := range tpDPPs {
|
||||
_, err = ms.getCol(utils.TBLTPRateProfiles).UpdateOne(sctx, bson.M{"tpid": tp.TPid, "id": tp.ID},
|
||||
bson.M{"$set": tp},
|
||||
options.Update().SetUpsert(true),
|
||||
)
|
||||
if err != nil {
|
||||
return err
|
||||
}
|
||||
}
|
||||
return nil
|
||||
})
|
||||
}
|
||||
|
||||
func (ms *MongoStorage) GetTPActionProfiles(tpid, tenant, id string) ([]*utils.TPActionProfile, error) {
|
||||
filter := bson.M{"tpid": tpid}
|
||||
if id != "" {
|
||||
filter["id"] = id
|
||||
}
|
||||
if tenant != "" {
|
||||
filter["tenant"] = tenant
|
||||
}
|
||||
var results []*utils.TPActionProfile
|
||||
err := ms.query(context.TODO(), func(sctx mongo.SessionContext) (err error) {
|
||||
cur, err := ms.getCol(utils.TBLTPActionProfiles).Find(sctx, filter)
|
||||
if err != nil {
|
||||
return err
|
||||
}
|
||||
for cur.Next(sctx) {
|
||||
var tp utils.TPActionProfile
|
||||
err := cur.Decode(&tp)
|
||||
if err != nil {
|
||||
return err
|
||||
}
|
||||
results = append(results, &tp)
|
||||
}
|
||||
if len(results) == 0 {
|
||||
return utils.ErrNotFound
|
||||
}
|
||||
return cur.Close(sctx)
|
||||
})
|
||||
return results, err
|
||||
}
|
||||
|
||||
func (ms *MongoStorage) GetTPAccounts(tpid, tenant, id string) ([]*utils.TPAccount, error) {
|
||||
filter := bson.M{"tpid": tpid}
|
||||
if id != "" {
|
||||
filter["id"] = id
|
||||
}
|
||||
if tenant != "" {
|
||||
filter["tenant"] = tenant
|
||||
}
|
||||
var results []*utils.TPAccount
|
||||
err := ms.query(context.TODO(), func(sctx mongo.SessionContext) (err error) {
|
||||
cur, err := ms.getCol(utils.TBLTPAccounts).Find(sctx, filter)
|
||||
if err != nil {
|
||||
return err
|
||||
}
|
||||
for cur.Next(sctx) {
|
||||
var tp utils.TPAccount
|
||||
err := cur.Decode(&tp)
|
||||
if err != nil {
|
||||
return err
|
||||
}
|
||||
results = append(results, &tp)
|
||||
}
|
||||
if len(results) == 0 {
|
||||
return utils.ErrNotFound
|
||||
}
|
||||
return cur.Close(sctx)
|
||||
})
|
||||
return results, err
|
||||
}
|
||||
|
||||
func (ms *MongoStorage) SetTPActionProfiles(tpAps []*utils.TPActionProfile) (err error) {
|
||||
if len(tpAps) == 0 {
|
||||
return
|
||||
}
|
||||
return ms.query(context.TODO(), func(sctx mongo.SessionContext) (err error) {
|
||||
for _, tp := range tpAps {
|
||||
_, err = ms.getCol(utils.TBLTPActionProfiles).UpdateOne(sctx, bson.M{"tpid": tp.TPid, "id": tp.ID},
|
||||
bson.M{"$set": tp},
|
||||
options.Update().SetUpsert(true),
|
||||
)
|
||||
if err != nil {
|
||||
return err
|
||||
}
|
||||
}
|
||||
return nil
|
||||
})
|
||||
}
|
||||
|
||||
func (ms *MongoStorage) SetTPAccounts(tpAps []*utils.TPAccount) (err error) {
|
||||
if len(tpAps) == 0 {
|
||||
return
|
||||
}
|
||||
return ms.query(context.TODO(), func(sctx mongo.SessionContext) (err error) {
|
||||
for _, tp := range tpAps {
|
||||
_, err = ms.getCol(utils.TBLTPAccounts).UpdateOne(sctx, bson.M{"tpid": tp.TPid, "id": tp.ID},
|
||||
bson.M{"$set": tp},
|
||||
options.Update().SetUpsert(true),
|
||||
)
|
||||
if err != nil {
|
||||
return err
|
||||
}
|
||||
}
|
||||
return nil
|
||||
})
|
||||
}
|
||||
|
||||
func (ms *MongoStorage) GetVersions(itm string) (vrs Versions, err error) {
|
||||
fop := options.FindOne()
|
||||
if itm != "" {
|
||||
@@ -864,7 +106,7 @@ func (ms *MongoStorage) GetStorageType() string {
|
||||
|
||||
func (ms *MongoStorage) SetCDR(cdr *utils.CGREvent, allowUpdate bool) error {
|
||||
if val, has := cdr.Event[utils.OrderID]; has && val == 0 {
|
||||
cdr.Event[utils.OrderID] = ms.cnter.Next()
|
||||
cdr.Event[utils.OrderID] = ms.counter.Next()
|
||||
}
|
||||
cdrTable := &CDR{
|
||||
Tenant: cdr.Tenant,
|
||||
@@ -998,14 +240,18 @@ func getQueryType(ruleType string, not bool, values []string) (msQuery string, v
|
||||
if not {
|
||||
msQuery = "$nin"
|
||||
}
|
||||
regex := make([]bsonx.Val, 0, len(values))
|
||||
regex := make([]primitive.Regex, 0, len(values))
|
||||
if ruleType == utils.MetaPrefix || ruleType == utils.MetaNotPrefix {
|
||||
for _, val := range values {
|
||||
regex = append(regex, bsonx.Regex("/^"+val+"/", utils.EmptyString))
|
||||
regex = append(regex, primitive.Regex{
|
||||
Pattern: "/^" + val + "/",
|
||||
})
|
||||
}
|
||||
} else {
|
||||
for _, val := range values {
|
||||
regex = append(regex, bsonx.Regex("/"+val+"$/", utils.EmptyString))
|
||||
regex = append(regex, primitive.Regex{
|
||||
Pattern: "/" + val + "$/",
|
||||
})
|
||||
}
|
||||
}
|
||||
valChanged = regex
|
||||
|
||||
@@ -20,6 +20,9 @@ package engine
|
||||
|
||||
import (
|
||||
"fmt"
|
||||
"net"
|
||||
"net/url"
|
||||
"path"
|
||||
"strconv"
|
||||
"strings"
|
||||
|
||||
@@ -79,3 +82,21 @@ func NewStorDBConn(dbType, host, port, name, user, pass, marshaler string,
|
||||
}
|
||||
return
|
||||
}
|
||||
|
||||
func buildURL(scheme, host, port, db, user, pass string) (*url.URL, error) {
|
||||
u, err := url.Parse("//" + host)
|
||||
if err != nil {
|
||||
return nil, err
|
||||
}
|
||||
if port != "0" {
|
||||
u.Host = net.JoinHostPort(u.Host, port)
|
||||
}
|
||||
if user != "" && pass != "" {
|
||||
u.User = url.UserPassword(user, pass)
|
||||
}
|
||||
if db != "" {
|
||||
u.Path = path.Join(u.Path, db)
|
||||
}
|
||||
u.Scheme = scheme
|
||||
return u, nil
|
||||
}
|
||||
|
||||
Reference in New Issue
Block a user