From 38578524da65e1a385292475ffb84a60b2427cec Mon Sep 17 00:00:00 2001 From: ionutboangiu Date: Fri, 27 Oct 2023 02:45:45 -0400 Subject: [PATCH] Upgrade MongoDB driver to v1.12 - Set (but comment) serverAPI options (currently distinct api and create.size BSON field are deprecated + possible others that are untested) - Remove the custom time decoder used for mongo BSON datetime values. The custom decoder was only converting these values into UTC and was not any different from the default time.Time decoder in the MongoDB driver, which also handles BSON string, int64, and document values. - Implement 'buildURL' function to connect to mongo (can also be used for mysql and postgres) - Update function names, variable names, and comments for clarity - Replace 'bsonx.Regex' with the Regex primitive for v1.12 compatibility - Use simple concatenation instead of Sprintf - Declare 'decimalType' locally, replace global 'decimalType' - Simplify several functions without altering functionality - Converting directly from a D to an M is deprecated. We are now decoding directly in a M. - Used errors.As and errors.Is for proper error comparison and assertion - Revised sloppy reassignments and added missing error checks --- engine/storage_mongo_datadb.go | 1460 ++++++++++++++++---------------- engine/storage_mongo_stordb.go | 317 +++---- engine/storage_utils.go | 21 + go.mod | 4 +- go.sum | 13 +- 5 files changed, 904 insertions(+), 911 deletions(-) diff --git a/engine/storage_mongo_datadb.go b/engine/storage_mongo_datadb.go index 9f5c8d41f..d94d57c3f 100644 --- a/engine/storage_mongo_datadb.go +++ b/engine/storage_mongo_datadb.go @@ -22,6 +22,7 @@ import ( "bytes" "compress/zlib" "context" + "errors" "fmt" "io" "reflect" @@ -38,14 +39,12 @@ import ( "go.mongodb.org/mongo-driver/bson" "go.mongodb.org/mongo-driver/bson/bsoncodec" "go.mongodb.org/mongo-driver/bson/bsonrw" - "go.mongodb.org/mongo-driver/bson/bsontype" "go.mongodb.org/mongo-driver/bson/primitive" "go.mongodb.org/mongo-driver/mongo" "go.mongodb.org/mongo-driver/mongo/options" - "go.mongodb.org/mongo-driver/x/bsonx" ) -// Mongo collections names +// Collection names in MongoDB. const ( ColDst = "destinations" ColRds = "reverse_destinations" @@ -100,48 +99,43 @@ var ( DestinationLow = strings.ToLower(utils.Destination) CostLow = strings.ToLower(utils.Cost) CostSourceLow = strings.ToLower(utils.CostSource) - - tTime = reflect.TypeOf(time.Time{}) - decimalType = reflect.TypeOf(utils.Decimal{}) ) -func TimeDecodeValue1(dc bsoncodec.DecodeContext, vr bsonrw.ValueReader, val reflect.Value) error { - if vr.Type() != bsontype.DateTime { - return fmt.Errorf("cannot decode %v into a time.Time", vr.Type()) +func decimalEncoder(ec bsoncodec.EncodeContext, vw bsonrw.ValueWriter, val reflect.Value) error { + decimalType := reflect.TypeOf(utils.Decimal{}) + + // All encoder implementations should check that val is valid and is of + // the correct type before proceeding. + if !val.IsValid() || val.Type() != decimalType { + return bsoncodec.ValueEncoderError{ + Name: "decimalEncoder", + Types: []reflect.Type{decimalType}, + Received: val, + } } - dt, err := vr.ReadDateTime() + sls, err := val.Interface().(utils.Decimal).MarshalText() if err != nil { return err } - if !val.CanSet() || val.Type() != tTime { - return bsoncodec.ValueDecoderError{Name: "TimeDecodeValue", Types: []reflect.Type{tTime}, Received: val} - } - val.Set(reflect.ValueOf(time.Unix(dt/1000, dt%1000*1000000).UTC())) - return nil -} - -func DecimalEncoder(ec bsoncodec.EncodeContext, vw bsonrw.ValueWriter, val reflect.Value) error { - if val.Kind() != reflect.Struct { - return bsoncodec.ValueEncoderError{Name: "DecimalEncoder", Kinds: []reflect.Kind{reflect.Struct}, Received: val} - } - d, ok := val.Interface().(utils.Decimal) - if !ok { - return fmt.Errorf("cannot cast <%+v> to ", val.Interface()) - } - sls, err := d.MarshalText() - if err != nil { - return err - } return vw.WriteBinary(sls) } -func DecimalDecoder(ec bsoncodec.DecodeContext, vw bsonrw.ValueReader, val reflect.Value) error { - if !val.CanSet() || val.Type() != decimalType { - return bsoncodec.ValueEncoderError{Name: "DecimalDecoder", Kinds: []reflect.Kind{reflect.Struct}, Received: val} +func decimalDecoder(dc bsoncodec.DecodeContext, vr bsonrw.ValueReader, val reflect.Value) error { + decimalType := reflect.TypeOf(utils.Decimal{}) + + // All decoder implementations should check that val is valid, settable, + // and is of the correct kind before proceeding. + if !val.IsValid() || !val.CanSet() || val.Type() != decimalType { + return bsoncodec.ValueDecoderError{ + Name: "decimalDecoder", + Types: []reflect.Type{decimalType}, + Received: val, + } } - data, _, err := vw.ReadBinary() + + data, _, err := vr.ReadBinary() if err != nil { return err } @@ -153,95 +147,88 @@ func DecimalDecoder(ec bsoncodec.DecodeContext, vw bsonrw.ValueReader, val refle return nil } -// NewMongoStorage givese new mongo driver -func NewMongoStorage(host, port, db, user, pass, mrshlerStr, storageType string, - cdrsIndexes []string, ttl time.Duration) (ms *MongoStorage, err error) { - url := host - if port != "0" { - url += ":" + port - } - if user != "" && pass != "" { - url = fmt.Sprintf("%s:%s@%s", user, pass, url) - } - var dbName string - if db != "" { - url += "/" + db - dbName = strings.Split(db, "?")[0] // remove extra info after ? - } - ctx := context.Background() - url = "mongodb://" + url - reg := bson.NewRegistryBuilder() - reg.RegisterDecoder(tTime, bsoncodec.ValueDecoderFunc(TimeDecodeValue1)) - reg.RegisterTypeEncoder(decimalType, bsoncodec.ValueEncoderFunc(DecimalEncoder)) - reg.RegisterTypeDecoder(decimalType, bsoncodec.ValueDecoderFunc(DecimalDecoder)) - opt := options.Client(). - ApplyURI(url). - SetRegistry(reg.Build()). - SetServerSelectionTimeout(ttl). - SetRetryWrites(false) // set this option to false because as default it is on true - - client, err := mongo.NewClient(opt) - // client, err := mongo.NewClient(url) - +// NewMongoStorage initializes a new MongoDB storage instance with provided connection parameters and settings. +// Returns an error if the setup fails. +func NewMongoStorage(host, port, db, user, pass, mrshlerStr string, storageType string, + cdrsIndexes []string, ttl time.Duration) (*MongoStorage, error) { + url, err := buildURL("mongodb", host, port, db, user, pass) if err != nil { return nil, err } - err = client.Connect(ctx) - if err != nil { - return nil, err - } - mrshler, err := NewMarshaler(mrshlerStr) - if err != nil { - return nil, err - } - - ms = &MongoStorage{ - client: client, - ctx: ctx, + mongoStorage := &MongoStorage{ + ctx: context.TODO(), ctxTTL: ttl, - db: dbName, - storageType: storageType, - ms: mrshler, cdrsIndexes: cdrsIndexes, + storageType: storageType, + counter: utils.NewCounter(time.Now().UnixNano(), 0), + } + reg := bson.NewRegistry() + decimalType := reflect.TypeOf(utils.Decimal{}) + reg.RegisterTypeEncoder(decimalType, bsoncodec.ValueEncoderFunc(decimalEncoder)) + reg.RegisterTypeDecoder(decimalType, bsoncodec.ValueDecoderFunc(decimalDecoder)) + // serverAPI := options.ServerAPI(options.ServerAPIVersion1).SetStrict(true).SetDeprecationErrors(true) + opts := options.Client(). + ApplyURI(url.String()). + SetRegistry(reg). + SetServerSelectionTimeout(mongoStorage.ctxTTL). + SetRetryWrites(false) // default is true + // SetServerAPIOptions(serverAPI) + + // Create a new client and connect to the server + mongoStorage.client, err = mongo.Connect(mongoStorage.ctx, opts) + if err != nil { + return nil, err } - if err = ms.query(func(sctx mongo.SessionContext) error { - cols, err := ms.client.Database(dbName).ListCollectionNames(sctx, bson.D{}) + mongoStorage.ms, err = NewMarshaler(mrshlerStr) + if err != nil { + return nil, err + } + if db != "" { + // Populate ms.db with the url path after trimming everything after '?'. + mongoStorage.db = strings.Split(db, "?")[0] + } + + err = mongoStorage.query(func(sctx mongo.SessionContext) error { + // Create indexes only if the database is empty or only the version table is present. + cols, err := mongoStorage.client.Database(mongoStorage.db). + ListCollectionNames(sctx, bson.D{}) if err != nil { return err } empty := true - for _, col := range cols { // create indexes only if database is empty or only version table is present + for _, col := range cols { if col != ColVer { empty = false break } } if empty { - return ms.EnsureIndexes() + return mongoStorage.EnsureIndexes() } return nil - }); err != nil { + }) + + if err != nil { return nil, err } - ms.cnter = utils.NewCounter(time.Now().UnixNano(), 0) - return + return mongoStorage, nil } -// MongoStorage struct for new mongo driver +// MongoStorage represents a storage interface for the new MongoDB driver. type MongoStorage struct { client *mongo.Client ctx context.Context ctxTTL time.Duration ctxTTLMutex sync.RWMutex // used for TTL reload db string - storageType string // datadb, stordb + storageType string // DataDB/StorDB ms Marshaler cdrsIndexes []string - cnter *utils.Counter + counter *utils.Counter } -func (ms *MongoStorage) query(argfunc func(ctx mongo.SessionContext) error) (err error) { +func (ms *MongoStorage) query(argfunc func(ctx mongo.SessionContext) error) error { ms.ctxTTLMutex.RLock() ctxSession, ctxSessionCancel := context.WithTimeout(ms.ctx, ms.ctxTTL) ms.ctxTTLMutex.RUnlock() @@ -249,12 +236,12 @@ func (ms *MongoStorage) query(argfunc func(ctx mongo.SessionContext) error) (err return ms.client.UseSession(ctxSession, argfunc) } -// IsDataDB returns if the storeage is used for DataDb +// IsDataDB returns whether or not the storage is used for DataDB. func (ms *MongoStorage) IsDataDB() bool { return ms.storageType == utils.DataDB } -// SetTTL set the context TTL used for queries (is thread safe) +// SetTTL sets the context TTL used for queries (Thread-safe). func (ms *MongoStorage) SetTTL(ttl time.Duration) { ms.ctxTTLMutex.Lock() ms.ctxTTL = ttl @@ -264,14 +251,14 @@ func (ms *MongoStorage) SetTTL(ttl time.Duration) { func (ms *MongoStorage) enusureIndex(colName string, uniq bool, keys ...string) error { return ms.query(func(sctx mongo.SessionContext) error { col := ms.getCol(colName) - io := options.Index().SetUnique(uniq) + indexOptions := options.Index().SetUnique(uniq) doc := make(bson.D, 0) for _, k := range keys { doc = append(doc, bson.E{Key: k, Value: 1}) } _, err := col.Indexes().CreateOne(sctx, mongo.IndexModel{ Keys: doc, - Options: io, + Options: indexOptions, }) return err }) @@ -289,38 +276,36 @@ func (ms *MongoStorage) getCol(col string) *mongo.Collection { return ms.client.Database(ms.db).Collection(col) } -// GetContext returns the context used for the current DB +// GetContext returns the context used for the current database. func (ms *MongoStorage) GetContext() context.Context { return ms.ctx } func isNotFound(err error) bool { - de, ok := err.(mongo.CommandError) - if !ok { // if still can't converted to the mongo.CommandError check if error do not contains message - return strings.Contains(err.Error(), "ns not found") + var de *mongo.CommandError + + if errors.As(err, &de) { + return de.Code == 26 || de.Message == "ns not found" } - return de.Code == 26 || de.Message == "ns not found" + + // If the error cannot be converted to mongo.CommandError + // check if the error message contains "ns not found" + return strings.Contains(err.Error(), "ns not found") } -func (ms *MongoStorage) ensureIndexesForCol(col string) (err error) { // exported for migrator - if err = ms.dropAllIndexesForCol(col); err != nil && !isNotFound(err) { // make sure you do not have indexes - return +func (ms *MongoStorage) ensureIndexesForCol(col string) error { // exported for migrator + err := ms.dropAllIndexesForCol(col) + if err != nil && !isNotFound(err) { // make sure you do not have indexes + return err } - err = nil switch col { case ColAct, ColApl, ColAAp, ColAtr, ColRpl, ColDst, ColRds, ColLht, ColIndx: - if err = ms.enusureIndex(col, true, "key"); err != nil { - return - } + err = ms.enusureIndex(col, true, "key") case ColRsP, ColRes, ColSqs, ColSqp, ColTps, ColThs, ColRts, ColAttr, ColFlt, ColCpp, ColDpp, ColDph: - if err = ms.enusureIndex(col, true, "tenant", "id"); err != nil { - return - } + err = ms.enusureIndex(col, true, "tenant", "id") case ColRpf, ColShg, ColAcc: - if err = ms.enusureIndex(col, true, "id"); err != nil { - return - } - //StorDB + err = ms.enusureIndex(col, true, "id") + // StorDB case utils.TBLTPTimings, utils.TBLTPDestinations, utils.TBLTPDestinationRates, utils.TBLTPRatingPlans, utils.TBLTPSharedGroups, utils.TBLTPActions, @@ -328,105 +313,89 @@ func (ms *MongoStorage) ensureIndexesForCol(col string) (err error) { // exporte utils.TBLTPStats, utils.TBLTPResources, utils.TBLTPDispatchers, utils.TBLTPDispatcherHosts, utils.TBLTPChargers, utils.TBLTPRoutes, utils.TBLTPThresholds: - if err = ms.enusureIndex(col, true, "tpid", "id"); err != nil { - return - } + err = ms.enusureIndex(col, true, "tpid", "id") case utils.TBLTPRatingProfiles: - if err = ms.enusureIndex(col, true, "tpid", "tenant", - "category", "subject", "loadid"); err != nil { - return + err = ms.enusureIndex(col, true, "tpid", "tenant", + "category", "subject", "loadid") + case utils.SessionCostsTBL: + err = ms.enusureIndex(col, true, CGRIDLow, RunIDLow) + if err == nil { + err = ms.enusureIndex(col, false, OriginHostLow, OriginIDLow) + } + if err == nil { + err = ms.enusureIndex(col, false, RunIDLow, OriginIDLow) } case utils.CDRsTBL: - if err = ms.enusureIndex(col, true, CGRIDLow, RunIDLow, - OriginIDLow); err != nil { - return - } - for _, idxKey := range ms.cdrsIndexes { - if err = ms.enusureIndex(col, false, idxKey); err != nil { - return + err = ms.enusureIndex(col, true, CGRIDLow, RunIDLow, + OriginIDLow) + if err == nil { + for _, idxKey := range ms.cdrsIndexes { + err = ms.enusureIndex(col, false, idxKey) + if err != nil { + break + } } } - case utils.SessionCostsTBL: - if err = ms.enusureIndex(col, true, CGRIDLow, - RunIDLow); err != nil { - return - } - if err = ms.enusureIndex(col, false, OriginHostLow, - OriginIDLow); err != nil { - return - } - if err = ms.enusureIndex(col, false, RunIDLow, - OriginIDLow); err != nil { - return - } } - return + return err } -// EnsureIndexes creates db indexes -func (ms *MongoStorage) EnsureIndexes(cols ...string) (err error) { - if len(cols) != 0 { - for _, col := range cols { - if err = ms.ensureIndexesForCol(col); err != nil { - return +// EnsureIndexes creates database indexes for the specified collections. +func (ms *MongoStorage) EnsureIndexes(cols ...string) error { + if len(cols) == 0 { + if ms.IsDataDB() { + cols = []string{ + ColAct, ColApl, ColAAp, ColAtr, ColRpl, ColDst, ColRds, ColLht, ColIndx, + ColRsP, ColRes, ColSqs, ColSqp, ColTps, ColThs, ColRts, ColAttr, ColFlt, ColCpp, + ColDpp, ColRpf, ColShg, ColAcc, } - } - return - } - if ms.storageType == utils.DataDB { - for _, col := range []string{ColAct, ColApl, ColAAp, ColAtr, - ColRpl, ColDst, ColRds, ColLht, ColIndx, ColRsP, ColRes, ColSqs, ColSqp, - ColTps, ColThs, ColRts, ColAttr, ColFlt, ColCpp, ColDpp, - ColRpf, ColShg, ColAcc} { - if err = ms.ensureIndexesForCol(col); err != nil { - return + } else { + cols = []string{ + utils.TBLTPTimings, utils.TBLTPDestinations, utils.TBLTPDestinationRates, + utils.TBLTPRatingPlans, utils.TBLTPSharedGroups, utils.TBLTPActions, utils.TBLTPActionPlans, + utils.TBLTPActionTriggers, utils.TBLTPStats, utils.TBLTPResources, utils.TBLTPRatingProfiles, + utils.CDRsTBL, utils.SessionCostsTBL, } } } - if ms.storageType == utils.StorDB { - for _, col := range []string{utils.TBLTPTimings, utils.TBLTPDestinations, - utils.TBLTPDestinationRates, utils.TBLTPRatingPlans, - utils.TBLTPSharedGroups, utils.TBLTPActions, - utils.TBLTPActionPlans, utils.TBLTPActionTriggers, - utils.TBLTPStats, utils.TBLTPResources, - utils.TBLTPRatingProfiles, utils.CDRsTBL, utils.SessionCostsTBL} { - if err = ms.ensureIndexesForCol(col); err != nil { - return - } + for _, col := range cols { + if err := ms.ensureIndexesForCol(col); err != nil { + return err } } - return + return nil } -// Close disconnects the client +// Close disconnects the MongoDB client. func (ms *MongoStorage) Close() { if err := ms.client.Disconnect(ms.ctx); err != nil { utils.Logger.Err(fmt.Sprintf(" Error on disconnect:%s", err)) } } -// Flush drops the datatable -func (ms *MongoStorage) Flush(ignore string) (err error) { +// Flush drops the datatable and recreates the indexes. +func (ms *MongoStorage) Flush(_ string) (err error) { return ms.query(func(sctx mongo.SessionContext) error { - if err = ms.client.Database(ms.db).Drop(sctx); err != nil { + err := ms.client.Database(ms.db).Drop(sctx) + if err != nil { return err } - return ms.EnsureIndexes() // recreate the indexes + return ms.EnsureIndexes() }) } -// DB returnes a database object +// DB returns the database object associated with the MongoDB client. func (ms *MongoStorage) DB() *mongo.Database { return ms.client.Database(ms.db) } -// SelectDatabase selects the database -func (ms *MongoStorage) SelectDatabase(dbName string) (err error) { +// SelectDatabase selects the specified database. +func (ms *MongoStorage) SelectDatabase(dbName string) error { ms.db = dbName - return + return nil } -func (ms *MongoStorage) RemoveKeysForPrefix(prefix string) (err error) { +func (ms *MongoStorage) RemoveKeysForPrefix(prefix string) error { var colName string switch prefix { case utils.DestinationPrefix: @@ -483,8 +452,8 @@ func (ms *MongoStorage) RemoveKeysForPrefix(prefix string) (err error) { }) } -// IsDBEmpty implementation -func (ms *MongoStorage) IsDBEmpty() (resp bool, err error) { +// IsDBEmpty checks if the database is empty by verifying if each collection is empty. +func (ms *MongoStorage) IsDBEmpty() (isEmpty bool, err error) { err = ms.query(func(sctx mongo.SessionContext) error { cols, err := ms.DB().ListCollectionNames(sctx, bson.D{}) if err != nil { @@ -494,26 +463,33 @@ func (ms *MongoStorage) IsDBEmpty() (resp bool, err error) { if col == utils.CDRsTBL { // ignore cdrs collection continue } - var count int64 - if count, err = ms.getCol(col).CountDocuments(sctx, bson.D{}, options.Count().SetLimit(1)); err != nil { // check if collection is empty so limit the count to 1 + count, err := ms.getCol(col).CountDocuments(sctx, bson.D{}, options.Count().SetLimit(1)) // limiting the count to 1 since we are only checking if the collection is empty + if err != nil { return err } if count != 0 { return nil } } - resp = true + isEmpty = true return nil }) - return resp, err + return isEmpty, err } -func (ms *MongoStorage) getField(sctx mongo.SessionContext, col, prefix, subject, field string) (result []string, err error) { - fieldResult := bson.D{} +func (ms *MongoStorage) getAllKeysMatchingField(sctx mongo.SessionContext, col, prefix, + subject, field string) (keys []string, err error) { + fieldResult := bson.M{} iter, err := ms.getCol(col).Find(sctx, - bson.M{field: bsonx.Regex(subject, "")}, + bson.M{ + field: primitive.Regex{ + Pattern: subject, + }, + }, options.Find().SetProjection( - bson.M{field: 1}, + bson.M{ + field: 1, + }, ), ) if err != nil { @@ -524,19 +500,22 @@ func (ms *MongoStorage) getField(sctx mongo.SessionContext, col, prefix, subject if err != nil { return } - result = append(result, prefix+fieldResult.Map()[field].(string)) + keys = append(keys, prefix+fieldResult[field].(string)) } - return result, iter.Close(sctx) + return keys, iter.Close(sctx) } -func (ms *MongoStorage) getField2(sctx mongo.SessionContext, col, prefix, subject string, tntID *utils.TenantID) (result []string, err error) { - idResult := struct{ Tenant, Id string }{} +func (ms *MongoStorage) getAllKeysMatchingTenantID(sctx mongo.SessionContext, col, prefix, + subject string, tntID *utils.TenantID) (keys []string, err error) { + idResult := struct{ Tenant, ID string }{} elem := bson.M{} if tntID.Tenant != "" { elem["tenant"] = tntID.Tenant } if tntID.ID != "" { - elem["id"] = bsonx.Regex(subject, "") + elem["id"] = primitive.Regex{ + Pattern: subject, + } } iter, err := ms.getCol(col).Find(sctx, elem, options.Find().SetProjection(bson.M{"tenant": 1, "id": 1}), @@ -549,17 +528,21 @@ func (ms *MongoStorage) getField2(sctx mongo.SessionContext, col, prefix, subjec if err != nil { return } - result = append(result, prefix+utils.ConcatenatedKey(idResult.Tenant, idResult.Id)) + keys = append(keys, prefix+utils.ConcatenatedKey(idResult.Tenant, idResult.ID)) } - return result, iter.Close(sctx) + return keys, iter.Close(sctx) } -func (ms *MongoStorage) getField3(sctx mongo.SessionContext, col, prefix, field string) (result []string, err error) { - fieldResult := bson.D{} - iter, err := ms.getCol(col).Find(sctx, - bson.M{field: bsonx.Regex(fmt.Sprintf("^%s", prefix), "")}, +func (ms *MongoStorage) getAllIndexKeys(sctx mongo.SessionContext, prefix string) (keys []string, err error) { + fieldResult := bson.M{} + iter, err := ms.getCol(ColIndx).Find(sctx, + bson.M{ + "key": primitive.Regex{ + Pattern: "^" + prefix, + }, + }, options.Find().SetProjection( - bson.M{field: 1}, + bson.M{"key": 1}, ), ) if err != nil { @@ -570,96 +553,98 @@ func (ms *MongoStorage) getField3(sctx mongo.SessionContext, col, prefix, field if err != nil { return } - result = append(result, fieldResult.Map()[field].(string)) + keys = append(keys, fieldResult["key"].(string)) } - return result, iter.Close(sctx) + return keys, iter.Close(sctx) } -// GetKeysForPrefix implementation -func (ms *MongoStorage) GetKeysForPrefix(prefix string) (result []string, err error) { +// GetKeysForPrefix retrieves keys matching the specified prefix across different categories. +func (ms *MongoStorage) GetKeysForPrefix(prefix string) (keys []string, err error) { var category, subject string keyLen := len(utils.DestinationPrefix) if len(prefix) < keyLen { - return nil, fmt.Errorf("unsupported prefix in GetKeysForPrefix: %q", prefix) + return nil, fmt.Errorf("unsupported prefix in GetKeysForPrefix: %s", prefix) } category = prefix[:keyLen] // prefix length tntID := utils.NewTenantID(prefix[keyLen:]) - subject = fmt.Sprintf("^%s", prefix[keyLen:]) // old way, no tenant support - err = ms.query(func(sctx mongo.SessionContext) (err error) { + subject = "^" + prefix[keyLen:] // old way, no tenant support + err = ms.query(func(sctx mongo.SessionContext) error { + var qryErr error switch category { case utils.DestinationPrefix: - result, err = ms.getField(sctx, ColDst, utils.DestinationPrefix, subject, "key") + keys, qryErr = ms.getAllKeysMatchingField(sctx, ColDst, utils.DestinationPrefix, subject, "key") case utils.ReverseDestinationPrefix: - result, err = ms.getField(sctx, ColRds, utils.ReverseDestinationPrefix, subject, "key") + keys, qryErr = ms.getAllKeysMatchingField(sctx, ColRds, utils.ReverseDestinationPrefix, subject, "key") case utils.RatingPlanPrefix: - result, err = ms.getField(sctx, ColRpl, utils.RatingPlanPrefix, subject, "key") + keys, qryErr = ms.getAllKeysMatchingField(sctx, ColRpl, utils.RatingPlanPrefix, subject, "key") case utils.RatingProfilePrefix: if strings.HasPrefix(prefix[keyLen:], utils.MetaOut) { - subject = fmt.Sprintf("^\\%s", prefix[keyLen:]) // rewrite the id cause it start with * from `*out` + // Rewrite the id as it starts with '*' (from "*out"). + subject = "^\\" + prefix[keyLen:] } - result, err = ms.getField(sctx, ColRpf, utils.RatingProfilePrefix, subject, "id") + keys, qryErr = ms.getAllKeysMatchingField(sctx, ColRpf, utils.RatingProfilePrefix, subject, "id") case utils.ActionPrefix: - result, err = ms.getField(sctx, ColAct, utils.ActionPrefix, subject, "key") + keys, qryErr = ms.getAllKeysMatchingField(sctx, ColAct, utils.ActionPrefix, subject, "key") case utils.ActionPlanPrefix: - result, err = ms.getField(sctx, ColApl, utils.ActionPlanPrefix, subject, "key") + keys, qryErr = ms.getAllKeysMatchingField(sctx, ColApl, utils.ActionPlanPrefix, subject, "key") case utils.ActionTriggerPrefix: - result, err = ms.getField(sctx, ColAtr, utils.ActionTriggerPrefix, subject, "key") + keys, qryErr = ms.getAllKeysMatchingField(sctx, ColAtr, utils.ActionTriggerPrefix, subject, "key") case utils.SharedGroupPrefix: - result, err = ms.getField(sctx, ColShg, utils.SharedGroupPrefix, subject, "id") + keys, qryErr = ms.getAllKeysMatchingField(sctx, ColShg, utils.SharedGroupPrefix, subject, "id") case utils.AccountPrefix: - result, err = ms.getField(sctx, ColAcc, utils.AccountPrefix, subject, "id") + keys, qryErr = ms.getAllKeysMatchingField(sctx, ColAcc, utils.AccountPrefix, subject, "id") case utils.ResourceProfilesPrefix: - result, err = ms.getField2(sctx, ColRsP, utils.ResourceProfilesPrefix, subject, tntID) + keys, qryErr = ms.getAllKeysMatchingTenantID(sctx, ColRsP, utils.ResourceProfilesPrefix, subject, tntID) case utils.ResourcesPrefix: - result, err = ms.getField2(sctx, ColRes, utils.ResourcesPrefix, subject, tntID) + keys, qryErr = ms.getAllKeysMatchingTenantID(sctx, ColRes, utils.ResourcesPrefix, subject, tntID) case utils.StatQueuePrefix: - result, err = ms.getField2(sctx, ColSqs, utils.StatQueuePrefix, subject, tntID) + keys, qryErr = ms.getAllKeysMatchingTenantID(sctx, ColSqs, utils.StatQueuePrefix, subject, tntID) case utils.StatQueueProfilePrefix: - result, err = ms.getField2(sctx, ColSqp, utils.StatQueueProfilePrefix, subject, tntID) + keys, qryErr = ms.getAllKeysMatchingTenantID(sctx, ColSqp, utils.StatQueueProfilePrefix, subject, tntID) case utils.AccountActionPlansPrefix: - result, err = ms.getField(sctx, ColAAp, utils.AccountActionPlansPrefix, subject, "key") + keys, qryErr = ms.getAllKeysMatchingField(sctx, ColAAp, utils.AccountActionPlansPrefix, subject, "key") case utils.TimingsPrefix: - result, err = ms.getField(sctx, ColTmg, utils.TimingsPrefix, subject, "id") + keys, qryErr = ms.getAllKeysMatchingField(sctx, ColTmg, utils.TimingsPrefix, subject, "id") case utils.FilterPrefix: - result, err = ms.getField2(sctx, ColFlt, utils.FilterPrefix, subject, tntID) + keys, qryErr = ms.getAllKeysMatchingTenantID(sctx, ColFlt, utils.FilterPrefix, subject, tntID) case utils.ThresholdPrefix: - result, err = ms.getField2(sctx, ColThs, utils.ThresholdPrefix, subject, tntID) + keys, qryErr = ms.getAllKeysMatchingTenantID(sctx, ColThs, utils.ThresholdPrefix, subject, tntID) case utils.ThresholdProfilePrefix: - result, err = ms.getField2(sctx, ColTps, utils.ThresholdProfilePrefix, subject, tntID) + keys, qryErr = ms.getAllKeysMatchingTenantID(sctx, ColTps, utils.ThresholdProfilePrefix, subject, tntID) case utils.RouteProfilePrefix: - result, err = ms.getField2(sctx, ColRts, utils.RouteProfilePrefix, subject, tntID) + keys, qryErr = ms.getAllKeysMatchingTenantID(sctx, ColRts, utils.RouteProfilePrefix, subject, tntID) case utils.AttributeProfilePrefix: - result, err = ms.getField2(sctx, ColAttr, utils.AttributeProfilePrefix, subject, tntID) + keys, qryErr = ms.getAllKeysMatchingTenantID(sctx, ColAttr, utils.AttributeProfilePrefix, subject, tntID) case utils.ChargerProfilePrefix: - result, err = ms.getField2(sctx, ColCpp, utils.ChargerProfilePrefix, subject, tntID) + keys, qryErr = ms.getAllKeysMatchingTenantID(sctx, ColCpp, utils.ChargerProfilePrefix, subject, tntID) case utils.DispatcherProfilePrefix: - result, err = ms.getField2(sctx, ColDpp, utils.DispatcherProfilePrefix, subject, tntID) + keys, qryErr = ms.getAllKeysMatchingTenantID(sctx, ColDpp, utils.DispatcherProfilePrefix, subject, tntID) case utils.DispatcherHostPrefix: - result, err = ms.getField2(sctx, ColDph, utils.DispatcherHostPrefix, subject, tntID) + keys, qryErr = ms.getAllKeysMatchingTenantID(sctx, ColDph, utils.DispatcherHostPrefix, subject, tntID) case utils.AttributeFilterIndexes: - result, err = ms.getField3(sctx, ColIndx, utils.AttributeFilterIndexes, "key") + keys, qryErr = ms.getAllIndexKeys(sctx, utils.AttributeFilterIndexes) case utils.ResourceFilterIndexes: - result, err = ms.getField3(sctx, ColIndx, utils.ResourceFilterIndexes, "key") + keys, qryErr = ms.getAllIndexKeys(sctx, utils.ResourceFilterIndexes) case utils.StatFilterIndexes: - result, err = ms.getField3(sctx, ColIndx, utils.StatFilterIndexes, "key") + keys, qryErr = ms.getAllIndexKeys(sctx, utils.StatFilterIndexes) case utils.ThresholdFilterIndexes: - result, err = ms.getField3(sctx, ColIndx, utils.ThresholdFilterIndexes, "key") + keys, qryErr = ms.getAllIndexKeys(sctx, utils.ThresholdFilterIndexes) case utils.RouteFilterIndexes: - result, err = ms.getField3(sctx, ColIndx, utils.RouteFilterIndexes, "key") + keys, qryErr = ms.getAllIndexKeys(sctx, utils.RouteFilterIndexes) case utils.ChargerFilterIndexes: - result, err = ms.getField3(sctx, ColIndx, utils.ChargerFilterIndexes, "key") + keys, qryErr = ms.getAllIndexKeys(sctx, utils.ChargerFilterIndexes) case utils.DispatcherFilterIndexes: - result, err = ms.getField3(sctx, ColIndx, utils.DispatcherFilterIndexes, "key") + keys, qryErr = ms.getAllIndexKeys(sctx, utils.DispatcherFilterIndexes) case utils.ActionPlanIndexes: - result, err = ms.getField3(sctx, ColIndx, utils.ActionPlanIndexes, "key") + keys, qryErr = ms.getAllIndexKeys(sctx, utils.ActionPlanIndexes) case utils.FilterIndexPrfx: - result, err = ms.getField3(sctx, ColIndx, utils.FilterIndexPrfx, "key") + keys, qryErr = ms.getAllIndexKeys(sctx, utils.FilterIndexPrfx) default: - err = fmt.Errorf("unsupported prefix in GetKeysForPrefix: %q", prefix) + qryErr = fmt.Errorf("unsupported prefix in GetKeysForPrefix: %s", prefix) } - return err + return qryErr }) - return + return keys, err } func (ms *MongoStorage) HasDataDrv(category, subject, tenant string) (has bool, err error) { @@ -711,21 +696,20 @@ func (ms *MongoStorage) HasDataDrv(category, subject, tenant string) (has bool, return has, err } -func (ms *MongoStorage) GetRatingPlanDrv(key string) (rp *RatingPlan, err error) { +func (ms *MongoStorage) GetRatingPlanDrv(key string) (*RatingPlan, error) { var kv struct { Key string Value []byte } - if err = ms.query(func(sctx mongo.SessionContext) (err error) { - cur := ms.getCol(ColRpl).FindOne(sctx, bson.M{"key": key}) - if err := cur.Decode(&kv); err != nil { - if err == mongo.ErrNoDocuments { - return utils.ErrNotFound - } - return err + err := ms.query(func(sctx mongo.SessionContext) (err error) { + sr := ms.getCol(ColRpl).FindOne(sctx, bson.M{"key": key}) + decodeErr := sr.Decode(&kv) + if errors.Is(decodeErr, mongo.ErrNoDocuments) { + return utils.ErrNotFound } - return nil - }); err != nil { + return decodeErr + }) + if err != nil { return nil, err } @@ -738,11 +722,13 @@ func (ms *MongoStorage) GetRatingPlanDrv(key string) (rp *RatingPlan, err error) if err != nil { return nil, err } - r.Close() - if err = ms.ms.Unmarshal(out, &rp); err != nil { + err = r.Close() + if err != nil { return nil, err } - return + var ratingPlan *RatingPlan + err = ms.ms.Unmarshal(out, &ratingPlan) + return ratingPlan, err } func (ms *MongoStorage) SetRatingPlanDrv(rp *RatingPlan) error { @@ -750,12 +736,19 @@ func (ms *MongoStorage) SetRatingPlanDrv(rp *RatingPlan) error { if err != nil { return err } + var b bytes.Buffer w := zlib.NewWriter(&b) - w.Write(result) - w.Close() - return ms.query(func(sctx mongo.SessionContext) (err error) { - _, err = ms.getCol(ColRpl).UpdateOne(sctx, bson.M{"key": rp.Id}, + _, err = w.Write(result) + if err != nil { + return err + } + err = w.Close() + if err != nil { + return err + } + return ms.query(func(sctx mongo.SessionContext) error { + _, err := ms.getCol(ColRpl).UpdateOne(sctx, bson.M{"key": rp.Id}, bson.M{"$set": struct { Key string Value []byte @@ -767,7 +760,7 @@ func (ms *MongoStorage) SetRatingPlanDrv(rp *RatingPlan) error { } func (ms *MongoStorage) RemoveRatingPlanDrv(key string) error { - return ms.query(func(sctx mongo.SessionContext) (err error) { + return ms.query(func(sctx mongo.SessionContext) error { dr, err := ms.getCol(ColRpl).DeleteMany(sctx, bson.M{"key": key}) if dr.DeletedCount == 0 { return utils.ErrNotFound @@ -776,25 +769,22 @@ func (ms *MongoStorage) RemoveRatingPlanDrv(key string) error { }) } -func (ms *MongoStorage) GetRatingProfileDrv(key string) (rp *RatingProfile, err error) { - rp = new(RatingProfile) - err = ms.query(func(sctx mongo.SessionContext) (err error) { - cur := ms.getCol(ColRpf).FindOne(sctx, bson.M{"id": key}) - if err := cur.Decode(rp); err != nil { - rp = nil - if err == mongo.ErrNoDocuments { - return utils.ErrNotFound - } - return err +func (ms *MongoStorage) GetRatingProfileDrv(key string) (*RatingProfile, error) { + rtProfile := new(RatingProfile) + err := ms.query(func(sctx mongo.SessionContext) error { + sr := ms.getCol(ColRpf).FindOne(sctx, bson.M{"id": key}) + decodeErr := sr.Decode(rtProfile) + if errors.Is(decodeErr, mongo.ErrNoDocuments) { + return utils.ErrNotFound } - return nil + return decodeErr }) - return + return rtProfile, err } -func (ms *MongoStorage) SetRatingProfileDrv(rp *RatingProfile) (err error) { - return ms.query(func(sctx mongo.SessionContext) (err error) { - _, err = ms.getCol(ColRpf).UpdateOne(sctx, bson.M{"id": rp.Id}, +func (ms *MongoStorage) SetRatingProfileDrv(rp *RatingProfile) error { + return ms.query(func(sctx mongo.SessionContext) error { + _, err := ms.getCol(ColRpf).UpdateOne(sctx, bson.M{"id": rp.Id}, bson.M{"$set": rp}, options.Update().SetUpsert(true), ) @@ -803,7 +793,7 @@ func (ms *MongoStorage) SetRatingProfileDrv(rp *RatingProfile) (err error) { } func (ms *MongoStorage) RemoveRatingProfileDrv(key string) error { - return ms.query(func(sctx mongo.SessionContext) (err error) { + return ms.query(func(sctx mongo.SessionContext) error { dr, err := ms.getCol(ColRpf).DeleteMany(sctx, bson.M{"id": key}) if dr.DeletedCount == 0 { return utils.ErrNotFound @@ -812,21 +802,22 @@ func (ms *MongoStorage) RemoveRatingProfileDrv(key string) error { }) } -func (ms *MongoStorage) GetDestinationDrv(key, transactionID string) (result *Destination, err error) { +func (ms *MongoStorage) GetDestinationDrv(key, transactionID string) (*Destination, error) { var kv struct { Key string Value []byte } - if err = ms.query(func(sctx mongo.SessionContext) (err error) { - cur := ms.getCol(ColDst).FindOne(sctx, bson.M{"key": key}) - if err := cur.Decode(&kv); err != nil { - if err == mongo.ErrNoDocuments { - return utils.ErrNotFound - } - return err + err := ms.query(func(sctx mongo.SessionContext) (err error) { + sr := ms.getCol(ColDst).FindOne(sctx, bson.M{"key": key}) + decodeErr := sr.Decode(&kv) + if errors.Is(decodeErr, mongo.ErrNoDocuments) { + Cache.Set(utils.CacheDestinations, key, nil, nil, + cacheCommit(transactionID), transactionID) + return utils.ErrNotFound } - return nil - }); err != nil { + return decodeErr + }) + if err != nil { return nil, err } b := bytes.NewBuffer(kv.Value) @@ -838,21 +829,31 @@ func (ms *MongoStorage) GetDestinationDrv(key, transactionID string) (result *De if err != nil { return nil, err } - r.Close() - err = ms.ms.Unmarshal(out, &result) - return + err = r.Close() + if err != nil { + return nil, err + } + var dst *Destination + err = ms.ms.Unmarshal(out, &dst) + return dst, err } -func (ms *MongoStorage) SetDestinationDrv(dest *Destination, transactionID string) (err error) { +func (ms *MongoStorage) SetDestinationDrv(dest *Destination, _ string) error { result, err := ms.ms.Marshal(dest) if err != nil { return err } var b bytes.Buffer w := zlib.NewWriter(&b) - w.Write(result) - w.Close() - return ms.query(func(sctx mongo.SessionContext) (err error) { + _, err = w.Write(result) + if err != nil { + return err + } + err = w.Close() + if err != nil { + return err + } + return ms.query(func(sctx mongo.SessionContext) error { _, err = ms.getCol(ColDst).UpdateOne(sctx, bson.M{"key": dest.Id}, bson.M{"$set": struct { Key string @@ -865,8 +866,8 @@ func (ms *MongoStorage) SetDestinationDrv(dest *Destination, transactionID strin } func (ms *MongoStorage) RemoveDestinationDrv(destID string, - transactionID string) (err error) { - return ms.query(func(sctx mongo.SessionContext) (err error) { + transactionID string) error { + return ms.query(func(sctx mongo.SessionContext) error { dr, err := ms.getCol(ColDst).DeleteOne(sctx, bson.M{"key": destID}) if dr.DeletedCount == 0 { return utils.ErrNotFound @@ -875,74 +876,68 @@ func (ms *MongoStorage) RemoveDestinationDrv(destID string, }) } -func (ms *MongoStorage) RemoveReverseDestinationDrv(dstID, prfx, transactionID string) (err error) { - return ms.query(func(sctx mongo.SessionContext) (err error) { - _, err = ms.getCol(ColRds).UpdateOne(sctx, bson.M{"key": prfx}, +func (ms *MongoStorage) RemoveReverseDestinationDrv(dstID, prfx, transactionID string) error { + return ms.query(func(sctx mongo.SessionContext) error { + _, err := ms.getCol(ColRds).UpdateOne(sctx, bson.M{"key": prfx}, bson.M{"$pull": bson.M{"value": dstID}}) return err }) } -func (ms *MongoStorage) GetReverseDestinationDrv(prefix, transactionID string) (ids []string, err error) { +func (ms *MongoStorage) GetReverseDestinationDrv(prefix, transactionID string) ([]string, error) { var result struct { Key string Value []string } - if err = ms.query(func(sctx mongo.SessionContext) (err error) { - cur := ms.getCol(ColRds).FindOne(sctx, bson.M{"key": prefix}) - if err := cur.Decode(&result); err != nil { - if err == mongo.ErrNoDocuments { - return utils.ErrNotFound - } - return err + err := ms.query(func(sctx mongo.SessionContext) error { + sr := ms.getCol(ColRds).FindOne(sctx, bson.M{"key": prefix}) + decodeErr := sr.Decode(&result) + if errors.Is(decodeErr, mongo.ErrNoDocuments) { + return utils.ErrNotFound } - return nil - }); err != nil { + return decodeErr + }) + if err != nil { return nil, err } - ids = result.Value - return + return result.Value, nil } -func (ms *MongoStorage) SetReverseDestinationDrv(destID string, prefixes []string, transactionID string) (err error) { +func (ms *MongoStorage) SetReverseDestinationDrv(destID string, prefixes []string, _ string) error { for _, p := range prefixes { - if err = ms.query(func(sctx mongo.SessionContext) (err error) { - _, err = ms.getCol(ColRds).UpdateOne(sctx, bson.M{"key": p}, + err := ms.query(func(sctx mongo.SessionContext) error { + _, qryErr := ms.getCol(ColRds).UpdateOne(sctx, bson.M{"key": p}, bson.M{"$addToSet": bson.M{"value": destID}}, options.Update().SetUpsert(true), ) - return err - }); err != nil { + return qryErr + }) + if err != nil { return err } } return nil } -func (ms *MongoStorage) GetActionsDrv(key string) (as Actions, err error) { +func (ms *MongoStorage) GetActionsDrv(key string) (Actions, error) { var result struct { Key string Value Actions } - if err = ms.query(func(sctx mongo.SessionContext) (err error) { - cur := ms.getCol(ColAct).FindOne(sctx, bson.M{"key": key}) - if err := cur.Decode(&result); err != nil { - if err == mongo.ErrNoDocuments { - return utils.ErrNotFound - } - return err + err := ms.query(func(sctx mongo.SessionContext) error { + sr := ms.getCol(ColAct).FindOne(sctx, bson.M{"key": key}) + decodeErr := sr.Decode(&result) + if errors.Is(decodeErr, mongo.ErrNoDocuments) { + return utils.ErrNotFound } - return nil - }); err != nil { - return nil, err - } - as = result.Value - return + return decodeErr + }) + return result.Value, err } func (ms *MongoStorage) SetActionsDrv(key string, as Actions) error { - return ms.query(func(sctx mongo.SessionContext) (err error) { - _, err = ms.getCol(ColAct).UpdateOne(sctx, bson.M{"key": key}, + return ms.query(func(sctx mongo.SessionContext) error { + _, err := ms.getCol(ColAct).UpdateOne(sctx, bson.M{"key": key}, bson.M{"$set": struct { Key string Value Actions @@ -954,7 +949,7 @@ func (ms *MongoStorage) SetActionsDrv(key string, as Actions) error { } func (ms *MongoStorage) RemoveActionsDrv(key string) error { - return ms.query(func(sctx mongo.SessionContext) (err error) { + return ms.query(func(sctx mongo.SessionContext) error { dr, err := ms.getCol(ColAct).DeleteOne(sctx, bson.M{"key": key}) if dr.DeletedCount == 0 { return utils.ErrNotFound @@ -963,25 +958,22 @@ func (ms *MongoStorage) RemoveActionsDrv(key string) error { }) } -func (ms *MongoStorage) GetSharedGroupDrv(key string) (sg *SharedGroup, err error) { - sg = new(SharedGroup) - err = ms.query(func(sctx mongo.SessionContext) (err error) { - cur := ms.getCol(ColShg).FindOne(sctx, bson.M{"id": key}) - if err := cur.Decode(sg); err != nil { - sg = nil - if err == mongo.ErrNoDocuments { - return utils.ErrNotFound - } - return err +func (ms *MongoStorage) GetSharedGroupDrv(key string) (*SharedGroup, error) { + sg := new(SharedGroup) + err := ms.query(func(sctx mongo.SessionContext) error { + sr := ms.getCol(ColShg).FindOne(sctx, bson.M{"id": key}) + decodeErr := sr.Decode(sg) + if errors.Is(decodeErr, mongo.ErrNoDocuments) { + return utils.ErrNotFound } - return nil + return decodeErr }) - return + return sg, err } -func (ms *MongoStorage) SetSharedGroupDrv(sg *SharedGroup) (err error) { - return ms.query(func(sctx mongo.SessionContext) (err error) { - _, err = ms.getCol(ColShg).UpdateOne(sctx, bson.M{"id": sg.Id}, +func (ms *MongoStorage) SetSharedGroupDrv(sg *SharedGroup) error { + return ms.query(func(sctx mongo.SessionContext) error { + _, err := ms.getCol(ColShg).UpdateOne(sctx, bson.M{"id": sg.Id}, bson.M{"$set": sg}, options.Update().SetUpsert(true), ) @@ -989,8 +981,8 @@ func (ms *MongoStorage) SetSharedGroupDrv(sg *SharedGroup) (err error) { }) } -func (ms *MongoStorage) RemoveSharedGroupDrv(id string) (err error) { - return ms.query(func(sctx mongo.SessionContext) (err error) { +func (ms *MongoStorage) RemoveSharedGroupDrv(id string) error { + return ms.query(func(sctx mongo.SessionContext) error { dr, err := ms.getCol(ColShg).DeleteOne(sctx, bson.M{"id": id}) if dr.DeletedCount == 0 { return utils.ErrNotFound @@ -999,20 +991,17 @@ func (ms *MongoStorage) RemoveSharedGroupDrv(id string) (err error) { }) } -func (ms *MongoStorage) GetAccountDrv(key string) (result *Account, err error) { - result = new(Account) - err = ms.query(func(sctx mongo.SessionContext) (err error) { - cur := ms.getCol(ColAcc).FindOne(sctx, bson.M{"id": key}) - if err := cur.Decode(result); err != nil { - result = nil - if err == mongo.ErrNoDocuments { - return utils.ErrNotFound - } - return err +func (ms *MongoStorage) GetAccountDrv(key string) (*Account, error) { + acc := new(Account) + err := ms.query(func(sctx mongo.SessionContext) (err error) { + sr := ms.getCol(ColAcc).FindOne(sctx, bson.M{"id": key}) + decodeErr := sr.Decode(acc) + if errors.Is(decodeErr, mongo.ErrNoDocuments) { + return utils.ErrNotFound } - return nil + return decodeErr }) - return + return acc, err } func (ms *MongoStorage) SetAccountDrv(acc *Account) error { @@ -1020,7 +1009,8 @@ func (ms *MongoStorage) SetAccountDrv(acc *Account) error { // UPDATE: if all balances expired and were cleaned it makes // sense to write empty balance map if len(acc.BalanceMap) == 0 { - if ac, err := ms.GetAccountDrv(acc.ID); err == nil && !ac.allBalancesExpired() { + ac, err := ms.GetAccountDrv(acc.ID) + if err == nil && !ac.allBalancesExpired() { ac.ActionTriggers = acc.ActionTriggers ac.UnitCounters = acc.UnitCounters ac.AllowNegative = acc.AllowNegative @@ -1029,8 +1019,8 @@ func (ms *MongoStorage) SetAccountDrv(acc *Account) error { } } acc.UpdateTime = time.Now() - return ms.query(func(sctx mongo.SessionContext) (err error) { - _, err = ms.getCol(ColAcc).UpdateOne(sctx, bson.M{"id": acc.ID}, + return ms.query(func(sctx mongo.SessionContext) error { + _, err := ms.getCol(ColAcc).UpdateOne(sctx, bson.M{"id": acc.ID}, bson.M{"$set": acc}, options.Update().SetUpsert(true), ) @@ -1038,8 +1028,8 @@ func (ms *MongoStorage) SetAccountDrv(acc *Account) error { }) } -func (ms *MongoStorage) RemoveAccountDrv(key string) (err error) { - return ms.query(func(sctx mongo.SessionContext) (err error) { +func (ms *MongoStorage) RemoveAccountDrv(key string) error { + return ms.query(func(sctx mongo.SessionContext) error { dr, err := ms.getCol(ColAcc).DeleteOne(sctx, bson.M{"id": key}) if dr.DeletedCount == 0 { return utils.ErrNotFound @@ -1048,16 +1038,20 @@ func (ms *MongoStorage) RemoveAccountDrv(key string) (err error) { }) } -// Limit will only retrieve the last n items out of history, newest first +// GetLoadHistory retrieves the last n items from the load history, newest first. func (ms *MongoStorage) GetLoadHistory(limit int, skipCache bool, - transactionID string) (loadInsts []*utils.LoadInstance, err error) { + transactionID string) ([]*utils.LoadInstance, error) { if limit == 0 { return nil, nil } if !skipCache { - if x, ok := Cache.Get(utils.LoadInstKey, ""); ok { + x, ok := Cache.Get(utils.LoadInstKey, "") + if ok { if x != nil { - items := x.([]*utils.LoadInstance) + items, ok := x.([]*utils.LoadInstance) + if !ok { + return nil, utils.ErrCastFailed + } if len(items) < limit || limit == -1 { return items, nil } @@ -1070,66 +1064,61 @@ func (ms *MongoStorage) GetLoadHistory(limit int, skipCache bool, Key string Value []*utils.LoadInstance } - err = ms.query(func(sctx mongo.SessionContext) (err error) { - cur := ms.getCol(ColLht).FindOne(sctx, bson.M{"key": utils.LoadInstKey}) - if err := cur.Decode(&kv); err != nil { - if err == mongo.ErrNoDocuments { - return utils.ErrNotFound - } - return err + err := ms.query(func(sctx mongo.SessionContext) error { + sr := ms.getCol(ColLht).FindOne(sctx, bson.M{"key": utils.LoadInstKey}) + decodeErr := sr.Decode(&kv) + if errors.Is(decodeErr, mongo.ErrNoDocuments) { + return utils.ErrNotFound } - return nil + return decodeErr }) cCommit := cacheCommit(transactionID) if err == nil { - loadInsts = kv.Value if errCh := Cache.Remove(utils.LoadInstKey, "", cCommit, transactionID); errCh != nil { return nil, errCh } - if errCh := Cache.Set(utils.LoadInstKey, "", loadInsts, nil, cCommit, transactionID); errCh != nil { + if errCh := Cache.Set(utils.LoadInstKey, "", kv.Value, nil, cCommit, transactionID); errCh != nil { return nil, errCh } } - if len(loadInsts) < limit || limit == -1 { - return loadInsts, nil + if len(kv.Value) < limit || limit == -1 { + return kv.Value, nil } - return loadInsts[:limit], nil + return kv.Value[:limit], nil } -// Adds a single load instance to load history +// AddLoadHistory adds a single load instance to the load history. func (ms *MongoStorage) AddLoadHistory(ldInst *utils.LoadInstance, loadHistSize int, transactionID string) error { if loadHistSize == 0 { // Load history disabled return nil } - // get existing load history + // Get existing load history. var existingLoadHistory []*utils.LoadInstance var kv struct { Key string Value []*utils.LoadInstance } - if err := ms.query(func(sctx mongo.SessionContext) (err error) { - cur := ms.getCol(ColLht).FindOne(sctx, bson.M{"key": utils.LoadInstKey}) - if err := cur.Decode(&kv); err != nil { - if err == mongo.ErrNoDocuments { - return nil // utils.ErrNotFound - } - return err + err := ms.query(func(sctx mongo.SessionContext) error { + sr := ms.getCol(ColLht).FindOne(sctx, bson.M{"key": utils.LoadInstKey}) + decodeErr := sr.Decode(&kv) + if errors.Is(decodeErr, mongo.ErrNoDocuments) { + return nil // utils.ErrNotFound } - return nil - }); err != nil { - return err - } + return decodeErr + }) if kv.Value != nil { existingLoadHistory = kv.Value } - err := guardian.Guardian.Guard(func() error { // Make sure we do it locked since other instance can modify history while we read it - // insert on first position + + // Make sure we do it locked since other instances can modify the history while we read it. + err = guardian.Guardian.Guard(func() error { + + // Insert at the first position. existingLoadHistory = append(existingLoadHistory, nil) copy(existingLoadHistory[1:], existingLoadHistory[0:]) existingLoadHistory[0] = ldInst - //check length histLen := len(existingLoadHistory) if histLen >= loadHistSize { // Have hit maximum history allowed, remove oldest element in order to add new one existingLoadHistory = existingLoadHistory[:loadHistSize] @@ -1153,36 +1142,31 @@ func (ms *MongoStorage) AddLoadHistory(ldInst *utils.LoadInstance, return err } -func (ms *MongoStorage) GetActionTriggersDrv(key string) (atrs ActionTriggers, err error) { +func (ms *MongoStorage) GetActionTriggersDrv(key string) (ActionTriggers, error) { var kv struct { Key string Value ActionTriggers } - if err := ms.query(func(sctx mongo.SessionContext) (err error) { - cur := ms.getCol(ColAtr).FindOne(sctx, bson.M{"key": key}) - if err := cur.Decode(&kv); err != nil { - if err == mongo.ErrNoDocuments { - return utils.ErrNotFound - } - return err + err := ms.query(func(sctx mongo.SessionContext) error { + sr := ms.getCol(ColAtr).FindOne(sctx, bson.M{"key": key}) + decodeErr := sr.Decode(&kv) + if errors.Is(decodeErr, mongo.ErrNoDocuments) { + return utils.ErrNotFound } - return nil - }); err != nil { - return nil, err - } - atrs = kv.Value - return + return decodeErr + }) + return kv.Value, err } -func (ms *MongoStorage) SetActionTriggersDrv(key string, atrs ActionTriggers) (err error) { +func (ms *MongoStorage) SetActionTriggersDrv(key string, atrs ActionTriggers) error { if len(atrs) == 0 { - return ms.query(func(sctx mongo.SessionContext) (err error) { - _, err = ms.getCol(ColAtr).DeleteOne(sctx, bson.M{"key": key}) + return ms.query(func(sctx mongo.SessionContext) error { + _, err := ms.getCol(ColAtr).DeleteOne(sctx, bson.M{"key": key}) return err }) } - return ms.query(func(sctx mongo.SessionContext) (err error) { - _, err = ms.getCol(ColAtr).UpdateOne(sctx, bson.M{"key": key}, + return ms.query(func(sctx mongo.SessionContext) error { + _, err := ms.getCol(ColAtr).UpdateOne(sctx, bson.M{"key": key}, bson.M{"$set": struct { Key string Value ActionTriggers @@ -1195,7 +1179,7 @@ func (ms *MongoStorage) SetActionTriggersDrv(key string, atrs ActionTriggers) (e func (ms *MongoStorage) RemoveActionTriggersDrv(key string) error { - return ms.query(func(sctx mongo.SessionContext) (err error) { + return ms.query(func(sctx mongo.SessionContext) error { dr, err := ms.getCol(ColAtr).DeleteOne(sctx, bson.M{"key": key}) if dr.DeletedCount == 0 { return utils.ErrNotFound @@ -1204,21 +1188,20 @@ func (ms *MongoStorage) RemoveActionTriggersDrv(key string) error { }) } -func (ms *MongoStorage) GetActionPlanDrv(key string) (ats *ActionPlan, err error) { +func (ms *MongoStorage) GetActionPlanDrv(key string) (*ActionPlan, error) { var kv struct { Key string Value []byte } - if err := ms.query(func(sctx mongo.SessionContext) (err error) { - cur := ms.getCol(ColApl).FindOne(sctx, bson.M{"key": key}) - if err := cur.Decode(&kv); err != nil { - if err == mongo.ErrNoDocuments { - return utils.ErrNotFound - } - return err + err := ms.query(func(sctx mongo.SessionContext) error { + sr := ms.getCol(ColApl).FindOne(sctx, bson.M{"key": key}) + decodeErr := sr.Decode(&kv) + if errors.Is(decodeErr, mongo.ErrNoDocuments) { + return utils.ErrNotFound } - return nil - }); err != nil { + return decodeErr + }) + if err != nil { return nil, err } b := bytes.NewBuffer(kv.Value) @@ -1230,22 +1213,32 @@ func (ms *MongoStorage) GetActionPlanDrv(key string) (ats *ActionPlan, err error if err != nil { return nil, err } - r.Close() - err = ms.ms.Unmarshal(out, &ats) - return + err = r.Close() + if err != nil { + return nil, err + } + var ap *ActionPlan + err = ms.ms.Unmarshal(out, &ap) + return ap, err } -func (ms *MongoStorage) SetActionPlanDrv(key string, ats *ActionPlan) (err error) { +func (ms *MongoStorage) SetActionPlanDrv(key string, ats *ActionPlan) error { result, err := ms.ms.Marshal(ats) if err != nil { return err } var b bytes.Buffer w := zlib.NewWriter(&b) - w.Write(result) - w.Close() - return ms.query(func(sctx mongo.SessionContext) (err error) { - _, err = ms.getCol(ColApl).UpdateOne(sctx, bson.M{"key": key}, + _, err = w.Write(result) + if err != nil { + return err + } + err = w.Close() + if err != nil { + return err + } + return ms.query(func(sctx mongo.SessionContext) error { + _, err := ms.getCol(ColApl).UpdateOne(sctx, bson.M{"key": key}, bson.M{"$set": struct { Key string Value []byte @@ -1257,13 +1250,13 @@ func (ms *MongoStorage) SetActionPlanDrv(key string, ats *ActionPlan) (err error } func (ms *MongoStorage) RemoveActionPlanDrv(key string) error { - return ms.query(func(sctx mongo.SessionContext) (err error) { - _, err = ms.getCol(ColApl).DeleteOne(sctx, bson.M{"key": key}) + return ms.query(func(sctx mongo.SessionContext) error { + _, err := ms.getCol(ColApl).DeleteOne(sctx, bson.M{"key": key}) return err }) } -func (ms *MongoStorage) GetAllActionPlansDrv() (ats map[string]*ActionPlan, err error) { +func (ms *MongoStorage) GetAllActionPlansDrv() (map[string]*ActionPlan, error) { keys, err := ms.GetKeysForPrefix(utils.ActionPlanPrefix) if err != nil { return nil, err @@ -1271,41 +1264,36 @@ func (ms *MongoStorage) GetAllActionPlansDrv() (ats map[string]*ActionPlan, err if len(keys) == 0 { return nil, utils.ErrNotFound } - ats = make(map[string]*ActionPlan, len(keys)) + actionPlans := make(map[string]*ActionPlan, len(keys)) for _, key := range keys { ap, err := ms.GetActionPlanDrv(key[len(utils.ActionPlanPrefix):]) if err != nil { return nil, err } - ats[key[len(utils.ActionPlanPrefix):]] = ap + actionPlans[key[len(utils.ActionPlanPrefix):]] = ap } - return + return actionPlans, nil } -func (ms *MongoStorage) GetAccountActionPlansDrv(acntID string) (aPlIDs []string, err error) { +func (ms *MongoStorage) GetAccountActionPlansDrv(acntID string) ([]string, error) { var kv struct { Key string Value []string } - if err = ms.query(func(sctx mongo.SessionContext) (err error) { - cur := ms.getCol(ColAAp).FindOne(sctx, bson.M{"key": acntID}) - if err := cur.Decode(&kv); err != nil { - if err == mongo.ErrNoDocuments { - return utils.ErrNotFound - } - return err + err := ms.query(func(sctx mongo.SessionContext) error { + sr := ms.getCol(ColAAp).FindOne(sctx, bson.M{"key": acntID}) + decodeErr := sr.Decode(&kv) + if errors.Is(decodeErr, mongo.ErrNoDocuments) { + return utils.ErrNotFound } - return nil - }); err != nil { - return nil, err - } - aPlIDs = kv.Value - return + return decodeErr + }) + return kv.Value, err } -func (ms *MongoStorage) SetAccountActionPlansDrv(acntID string, aPlIDs []string) (err error) { - return ms.query(func(sctx mongo.SessionContext) (err error) { - _, err = ms.getCol(ColAAp).UpdateOne(sctx, bson.M{"key": acntID}, +func (ms *MongoStorage) SetAccountActionPlansDrv(acntID string, aPlIDs []string) error { + return ms.query(func(sctx mongo.SessionContext) error { + _, err := ms.getCol(ColAAp).UpdateOne(sctx, bson.M{"key": acntID}, bson.M{"$set": struct { Key string Value []string @@ -1317,8 +1305,8 @@ func (ms *MongoStorage) SetAccountActionPlansDrv(acntID string, aPlIDs []string) } // ToDo: check return len(aPlIDs) == 0 -func (ms *MongoStorage) RemAccountActionPlansDrv(acntID string) (err error) { - return ms.query(func(sctx mongo.SessionContext) (err error) { +func (ms *MongoStorage) RemAccountActionPlansDrv(acntID string) error { + return ms.query(func(sctx mongo.SessionContext) error { dr, err := ms.getCol(ColAAp).DeleteOne(sctx, bson.M{"key": acntID}) if dr.DeletedCount == 0 { return utils.ErrNotFound @@ -1334,45 +1322,38 @@ func (ms *MongoStorage) PushTask(t *Task) error { }) } -func (ms *MongoStorage) PopTask() (t *Task, err error) { +func (ms *MongoStorage) PopTask() (*Task, error) { v := struct { ID primitive.ObjectID `bson:"_id"` Task *Task }{} - if err = ms.query(func(sctx mongo.SessionContext) (err error) { - cur := ms.getCol(ColTsk).FindOneAndDelete(sctx, bson.D{}) - if err := cur.Decode(&v); err != nil { - if err == mongo.ErrNoDocuments { - return utils.ErrNotFound - } - return err + err := ms.query(func(sctx mongo.SessionContext) error { + sr := ms.getCol(ColTsk).FindOneAndDelete(sctx, bson.D{}) + decodeErr := sr.Decode(&v) + if errors.Is(decodeErr, mongo.ErrNoDocuments) { + return utils.ErrNotFound } - return nil - }); err != nil { - return nil, err - } - return v.Task, nil -} - -func (ms *MongoStorage) GetResourceProfileDrv(tenant, id string) (rp *ResourceProfile, err error) { - rp = new(ResourceProfile) - err = ms.query(func(sctx mongo.SessionContext) (err error) { - cur := ms.getCol(ColRsP).FindOne(sctx, bson.M{"tenant": tenant, "id": id}) - if err := cur.Decode(rp); err != nil { - rp = nil - if err == mongo.ErrNoDocuments { - return utils.ErrNotFound - } - return err - } - return nil + return decodeErr }) - return + return v.Task, err } -func (ms *MongoStorage) SetResourceProfileDrv(rp *ResourceProfile) (err error) { - return ms.query(func(sctx mongo.SessionContext) (err error) { - _, err = ms.getCol(ColRsP).UpdateOne(sctx, bson.M{"tenant": rp.Tenant, "id": rp.ID}, +func (ms *MongoStorage) GetResourceProfileDrv(tenant, id string) (*ResourceProfile, error) { + rsProfile := new(ResourceProfile) + err := ms.query(func(sctx mongo.SessionContext) error { + sr := ms.getCol(ColRsP).FindOne(sctx, bson.M{"tenant": tenant, "id": id}) + decodeErr := sr.Decode(rsProfile) + if errors.Is(decodeErr, mongo.ErrNoDocuments) { + return utils.ErrNotFound + } + return decodeErr + }) + return rsProfile, err +} + +func (ms *MongoStorage) SetResourceProfileDrv(rp *ResourceProfile) error { + return ms.query(func(sctx mongo.SessionContext) error { + _, err := ms.getCol(ColRsP).UpdateOne(sctx, bson.M{"tenant": rp.Tenant, "id": rp.ID}, bson.M{"$set": rp}, options.Update().SetUpsert(true), ) @@ -1380,8 +1361,8 @@ func (ms *MongoStorage) SetResourceProfileDrv(rp *ResourceProfile) (err error) { }) } -func (ms *MongoStorage) RemoveResourceProfileDrv(tenant, id string) (err error) { - return ms.query(func(sctx mongo.SessionContext) (err error) { +func (ms *MongoStorage) RemoveResourceProfileDrv(tenant, id string) error { + return ms.query(func(sctx mongo.SessionContext) error { dr, err := ms.getCol(ColRsP).DeleteOne(sctx, bson.M{"tenant": tenant, "id": id}) if dr.DeletedCount == 0 { return utils.ErrNotFound @@ -1390,25 +1371,22 @@ func (ms *MongoStorage) RemoveResourceProfileDrv(tenant, id string) (err error) }) } -func (ms *MongoStorage) GetResourceDrv(tenant, id string) (r *Resource, err error) { - r = new(Resource) - err = ms.query(func(sctx mongo.SessionContext) (err error) { - cur := ms.getCol(ColRes).FindOne(sctx, bson.M{"tenant": tenant, "id": id}) - if err := cur.Decode(r); err != nil { - r = nil - if err == mongo.ErrNoDocuments { - return utils.ErrNotFound - } - return err +func (ms *MongoStorage) GetResourceDrv(tenant, id string) (*Resource, error) { + resource := new(Resource) + err := ms.query(func(sctx mongo.SessionContext) error { + sr := ms.getCol(ColRes).FindOne(sctx, bson.M{"tenant": tenant, "id": id}) + decodeErr := sr.Decode(resource) + if errors.Is(decodeErr, mongo.ErrNoDocuments) { + return utils.ErrNotFound } - return nil + return decodeErr }) - return + return resource, err } -func (ms *MongoStorage) SetResourceDrv(r *Resource) (err error) { - return ms.query(func(sctx mongo.SessionContext) (err error) { - _, err = ms.getCol(ColRes).UpdateOne(sctx, bson.M{"tenant": r.Tenant, "id": r.ID}, +func (ms *MongoStorage) SetResourceDrv(r *Resource) error { + return ms.query(func(sctx mongo.SessionContext) error { + _, err := ms.getCol(ColRes).UpdateOne(sctx, bson.M{"tenant": r.Tenant, "id": r.ID}, bson.M{"$set": r}, options.Update().SetUpsert(true), ) @@ -1416,8 +1394,8 @@ func (ms *MongoStorage) SetResourceDrv(r *Resource) (err error) { }) } -func (ms *MongoStorage) RemoveResourceDrv(tenant, id string) (err error) { - return ms.query(func(sctx mongo.SessionContext) (err error) { +func (ms *MongoStorage) RemoveResourceDrv(tenant, id string) error { + return ms.query(func(sctx mongo.SessionContext) error { dr, err := ms.getCol(ColRes).DeleteOne(sctx, bson.M{"tenant": tenant, "id": id}) if dr.DeletedCount == 0 { return utils.ErrNotFound @@ -1426,25 +1404,22 @@ func (ms *MongoStorage) RemoveResourceDrv(tenant, id string) (err error) { }) } -func (ms *MongoStorage) GetTimingDrv(id string) (t *utils.TPTiming, err error) { - t = new(utils.TPTiming) - err = ms.query(func(sctx mongo.SessionContext) (err error) { - cur := ms.getCol(ColTmg).FindOne(sctx, bson.M{"id": id}) - if err := cur.Decode(t); err != nil { - t = nil - if err == mongo.ErrNoDocuments { - return utils.ErrNotFound - } - return err +func (ms *MongoStorage) GetTimingDrv(id string) (*utils.TPTiming, error) { + timing := new(utils.TPTiming) + err := ms.query(func(sctx mongo.SessionContext) error { + sr := ms.getCol(ColTmg).FindOne(sctx, bson.M{"id": id}) + decodeErr := sr.Decode(timing) + if errors.Is(decodeErr, mongo.ErrNoDocuments) { + return utils.ErrNotFound } - return nil + return decodeErr }) - return + return timing, err } -func (ms *MongoStorage) SetTimingDrv(t *utils.TPTiming) (err error) { - return ms.query(func(sctx mongo.SessionContext) (err error) { - _, err = ms.getCol(ColTmg).UpdateOne(sctx, bson.M{"id": t.ID}, +func (ms *MongoStorage) SetTimingDrv(t *utils.TPTiming) error { + return ms.query(func(sctx mongo.SessionContext) error { + _, err := ms.getCol(ColTmg).UpdateOne(sctx, bson.M{"id": t.ID}, bson.M{"$set": t}, options.Update().SetUpsert(true), ) @@ -1452,8 +1427,8 @@ func (ms *MongoStorage) SetTimingDrv(t *utils.TPTiming) (err error) { }) } -func (ms *MongoStorage) RemoveTimingDrv(id string) (err error) { - return ms.query(func(sctx mongo.SessionContext) (err error) { +func (ms *MongoStorage) RemoveTimingDrv(id string) error { + return ms.query(func(sctx mongo.SessionContext) error { dr, err := ms.getCol(ColTmg).DeleteOne(sctx, bson.M{"id": id}) if dr.DeletedCount == 0 { return utils.ErrNotFound @@ -1463,26 +1438,23 @@ func (ms *MongoStorage) RemoveTimingDrv(id string) (err error) { } // GetStatQueueProfileDrv retrieves a StatQueueProfile from dataDB -func (ms *MongoStorage) GetStatQueueProfileDrv(tenant string, id string) (sq *StatQueueProfile, err error) { - sq = new(StatQueueProfile) - err = ms.query(func(sctx mongo.SessionContext) (err error) { - cur := ms.getCol(ColSqp).FindOne(sctx, bson.M{"tenant": tenant, "id": id}) - if err := cur.Decode(sq); err != nil { - sq = nil - if err == mongo.ErrNoDocuments { - return utils.ErrNotFound - } - return err +func (ms *MongoStorage) GetStatQueueProfileDrv(tenant string, id string) (*StatQueueProfile, error) { + sqProfile := new(StatQueueProfile) + err := ms.query(func(sctx mongo.SessionContext) error { + sr := ms.getCol(ColSqp).FindOne(sctx, bson.M{"tenant": tenant, "id": id}) + decodeErr := sr.Decode(sqProfile) + if errors.Is(decodeErr, mongo.ErrNoDocuments) { + return utils.ErrNotFound } - return nil + return decodeErr }) - return + return sqProfile, err } // SetStatQueueProfileDrv stores a StatsQueue into DataDB -func (ms *MongoStorage) SetStatQueueProfileDrv(sq *StatQueueProfile) (err error) { - return ms.query(func(sctx mongo.SessionContext) (err error) { - _, err = ms.getCol(ColSqp).UpdateOne(sctx, bson.M{"tenant": sq.Tenant, "id": sq.ID}, +func (ms *MongoStorage) SetStatQueueProfileDrv(sq *StatQueueProfile) error { + return ms.query(func(sctx mongo.SessionContext) error { + _, err := ms.getCol(ColSqp).UpdateOne(sctx, bson.M{"tenant": sq.Tenant, "id": sq.ID}, bson.M{"$set": sq}, options.Update().SetUpsert(true), ) @@ -1491,8 +1463,8 @@ func (ms *MongoStorage) SetStatQueueProfileDrv(sq *StatQueueProfile) (err error) } // RemStatQueueProfileDrv removes a StatsQueue from dataDB -func (ms *MongoStorage) RemStatQueueProfileDrv(tenant, id string) (err error) { - return ms.query(func(sctx mongo.SessionContext) (err error) { +func (ms *MongoStorage) RemStatQueueProfileDrv(tenant, id string) error { + return ms.query(func(sctx mongo.SessionContext) error { dr, err := ms.getCol(ColSqp).DeleteOne(sctx, bson.M{"tenant": tenant, "id": id}) if dr.DeletedCount == 0 { return utils.ErrNotFound @@ -1502,34 +1474,31 @@ func (ms *MongoStorage) RemStatQueueProfileDrv(tenant, id string) (err error) { } // GetStatQueueDrv retrieves a StoredStatQueue -func (ms *MongoStorage) GetStatQueueDrv(tenant, id string) (sq *StatQueue, err error) { +func (ms *MongoStorage) GetStatQueueDrv(tenant, id string) (*StatQueue, error) { ssq := new(StoredStatQueue) - if err = ms.query(func(sctx mongo.SessionContext) (err error) { - cur := ms.getCol(ColSqs).FindOne(sctx, bson.M{"tenant": tenant, "id": id}) - if err := cur.Decode(ssq); err != nil { - sq = nil - if err == mongo.ErrNoDocuments { - return utils.ErrNotFound - } - return err + err := ms.query(func(sctx mongo.SessionContext) error { + sr := ms.getCol(ColSqs).FindOne(sctx, bson.M{"tenant": tenant, "id": id}) + decodeErr := sr.Decode(ssq) + if errors.Is(decodeErr, mongo.ErrNoDocuments) { + return utils.ErrNotFound } - return nil - }); err != nil { - return + return decodeErr + }) + if err != nil { + return nil, err } - sq, err = ssq.AsStatQueue(ms.ms) - return + return ssq.AsStatQueue(ms.ms) } // SetStatQueueDrv stores the metrics for a StoredStatQueue func (ms *MongoStorage) SetStatQueueDrv(ssq *StoredStatQueue, sq *StatQueue) (err error) { if ssq == nil { if ssq, err = NewStoredStatQueue(sq, ms.ms); err != nil { - return + return err } } - return ms.query(func(sctx mongo.SessionContext) (err error) { - _, err = ms.getCol(ColSqs).UpdateOne(sctx, bson.M{"tenant": ssq.Tenant, "id": ssq.ID}, + return ms.query(func(sctx mongo.SessionContext) error { + _, err := ms.getCol(ColSqs).UpdateOne(sctx, bson.M{"tenant": ssq.Tenant, "id": ssq.ID}, bson.M{"$set": ssq}, options.Update().SetUpsert(true), ) @@ -1538,8 +1507,8 @@ func (ms *MongoStorage) SetStatQueueDrv(ssq *StoredStatQueue, sq *StatQueue) (er } // RemStatQueueDrv removes stored metrics for a StoredStatQueue -func (ms *MongoStorage) RemStatQueueDrv(tenant, id string) (err error) { - return ms.query(func(sctx mongo.SessionContext) (err error) { +func (ms *MongoStorage) RemStatQueueDrv(tenant, id string) error { + return ms.query(func(sctx mongo.SessionContext) error { dr, err := ms.getCol(ColSqs).DeleteOne(sctx, bson.M{"tenant": tenant, "id": id}) if dr.DeletedCount == 0 { return utils.ErrNotFound @@ -1549,24 +1518,21 @@ func (ms *MongoStorage) RemStatQueueDrv(tenant, id string) (err error) { } // GetThresholdProfileDrv retrieves a ThresholdProfile from dataDB -func (ms *MongoStorage) GetThresholdProfileDrv(tenant, ID string) (tp *ThresholdProfile, err error) { - tp = new(ThresholdProfile) - err = ms.query(func(sctx mongo.SessionContext) (err error) { - cur := ms.getCol(ColTps).FindOne(sctx, bson.M{"tenant": tenant, "id": ID}) - if err := cur.Decode(tp); err != nil { - tp = nil - if err == mongo.ErrNoDocuments { - return utils.ErrNotFound - } - return err +func (ms *MongoStorage) GetThresholdProfileDrv(tenant, ID string) (*ThresholdProfile, error) { + thProfile := new(ThresholdProfile) + err := ms.query(func(sctx mongo.SessionContext) error { + sr := ms.getCol(ColTps).FindOne(sctx, bson.M{"tenant": tenant, "id": ID}) + decodeErr := sr.Decode(thProfile) + if errors.Is(decodeErr, mongo.ErrNoDocuments) { + return utils.ErrNotFound } - return nil + return decodeErr }) - return + return thProfile, err } // SetThresholdProfileDrv stores a ThresholdProfile into DataDB -func (ms *MongoStorage) SetThresholdProfileDrv(tp *ThresholdProfile) (err error) { +func (ms *MongoStorage) SetThresholdProfileDrv(tp *ThresholdProfile) error { return ms.query(func(sctx mongo.SessionContext) (err error) { _, err = ms.getCol(ColTps).UpdateOne(sctx, bson.M{"tenant": tp.Tenant, "id": tp.ID}, bson.M{"$set": tp}, options.Update().SetUpsert(true), @@ -1576,8 +1542,8 @@ func (ms *MongoStorage) SetThresholdProfileDrv(tp *ThresholdProfile) (err error) } // RemoveThresholdProfile removes a ThresholdProfile from dataDB/cache -func (ms *MongoStorage) RemThresholdProfileDrv(tenant, id string) (err error) { - return ms.query(func(sctx mongo.SessionContext) (err error) { +func (ms *MongoStorage) RemThresholdProfileDrv(tenant, id string) error { + return ms.query(func(sctx mongo.SessionContext) error { dr, err := ms.getCol(ColTps).DeleteOne(sctx, bson.M{"tenant": tenant, "id": id}) if dr.DeletedCount == 0 { return utils.ErrNotFound @@ -1586,25 +1552,22 @@ func (ms *MongoStorage) RemThresholdProfileDrv(tenant, id string) (err error) { }) } -func (ms *MongoStorage) GetThresholdDrv(tenant, id string) (r *Threshold, err error) { - r = new(Threshold) - err = ms.query(func(sctx mongo.SessionContext) (err error) { - cur := ms.getCol(ColThs).FindOne(sctx, bson.M{"tenant": tenant, "id": id}) - if err := cur.Decode(r); err != nil { - r = nil - if err == mongo.ErrNoDocuments { - return utils.ErrNotFound - } - return err +func (ms *MongoStorage) GetThresholdDrv(tenant, id string) (*Threshold, error) { + th := new(Threshold) + err := ms.query(func(sctx mongo.SessionContext) error { + sr := ms.getCol(ColThs).FindOne(sctx, bson.M{"tenant": tenant, "id": id}) + decodeErr := sr.Decode(th) + if errors.Is(decodeErr, mongo.ErrNoDocuments) { + return utils.ErrNotFound } - return nil + return decodeErr }) - return + return th, err } -func (ms *MongoStorage) SetThresholdDrv(r *Threshold) (err error) { - return ms.query(func(sctx mongo.SessionContext) (err error) { - _, err = ms.getCol(ColThs).UpdateOne(sctx, bson.M{"tenant": r.Tenant, "id": r.ID}, +func (ms *MongoStorage) SetThresholdDrv(r *Threshold) error { + return ms.query(func(sctx mongo.SessionContext) error { + _, err := ms.getCol(ColThs).UpdateOne(sctx, bson.M{"tenant": r.Tenant, "id": r.ID}, bson.M{"$set": r}, options.Update().SetUpsert(true), ) @@ -1612,8 +1575,8 @@ func (ms *MongoStorage) SetThresholdDrv(r *Threshold) (err error) { }) } -func (ms *MongoStorage) RemoveThresholdDrv(tenant, id string) (err error) { - return ms.query(func(sctx mongo.SessionContext) (err error) { +func (ms *MongoStorage) RemoveThresholdDrv(tenant, id string) error { + return ms.query(func(sctx mongo.SessionContext) error { dr, err := ms.getCol(ColThs).DeleteOne(sctx, bson.M{"tenant": tenant, "id": id}) if dr.DeletedCount == 0 { return utils.ErrNotFound @@ -1622,26 +1585,25 @@ func (ms *MongoStorage) RemoveThresholdDrv(tenant, id string) (err error) { }) } -func (ms *MongoStorage) GetFilterDrv(tenant, id string) (r *Filter, err error) { - r = new(Filter) - if err = ms.query(func(sctx mongo.SessionContext) (err error) { - cur := ms.getCol(ColFlt).FindOne(sctx, bson.M{"tenant": tenant, "id": id}) - if err := cur.Decode(r); err != nil { - if err == mongo.ErrNoDocuments { - return utils.ErrNotFound - } - return err +func (ms *MongoStorage) GetFilterDrv(tenant, id string) (*Filter, error) { + fltr := new(Filter) + err := ms.query(func(sctx mongo.SessionContext) error { + sr := ms.getCol(ColFlt).FindOne(sctx, bson.M{"tenant": tenant, "id": id}) + decodeErr := sr.Decode(fltr) + if errors.Is(decodeErr, mongo.ErrNoDocuments) { + return utils.ErrNotFound } - return nil - }); err != nil { + return decodeErr + }) + if err != nil { return nil, err } - return + return fltr, err } -func (ms *MongoStorage) SetFilterDrv(r *Filter) (err error) { - return ms.query(func(sctx mongo.SessionContext) (err error) { - _, err = ms.getCol(ColFlt).UpdateOne(sctx, bson.M{"tenant": r.Tenant, "id": r.ID}, +func (ms *MongoStorage) SetFilterDrv(r *Filter) error { + return ms.query(func(sctx mongo.SessionContext) error { + _, err := ms.getCol(ColFlt).UpdateOne(sctx, bson.M{"tenant": r.Tenant, "id": r.ID}, bson.M{"$set": r}, options.Update().SetUpsert(true), ) @@ -1649,8 +1611,8 @@ func (ms *MongoStorage) SetFilterDrv(r *Filter) (err error) { }) } -func (ms *MongoStorage) RemoveFilterDrv(tenant, id string) (err error) { - return ms.query(func(sctx mongo.SessionContext) (err error) { +func (ms *MongoStorage) RemoveFilterDrv(tenant, id string) error { + return ms.query(func(sctx mongo.SessionContext) error { dr, err := ms.getCol(ColFlt).DeleteOne(sctx, bson.M{"tenant": tenant, "id": id}) if dr.DeletedCount == 0 { return utils.ErrNotFound @@ -1659,25 +1621,22 @@ func (ms *MongoStorage) RemoveFilterDrv(tenant, id string) (err error) { }) } -func (ms *MongoStorage) GetRouteProfileDrv(tenant, id string) (r *RouteProfile, err error) { - r = new(RouteProfile) - err = ms.query(func(sctx mongo.SessionContext) (err error) { - cur := ms.getCol(ColRts).FindOne(sctx, bson.M{"tenant": tenant, "id": id}) - if err := cur.Decode(r); err != nil { - r = nil - if err == mongo.ErrNoDocuments { - return utils.ErrNotFound - } - return err +func (ms *MongoStorage) GetRouteProfileDrv(tenant, id string) (*RouteProfile, error) { + routeProfile := new(RouteProfile) + err := ms.query(func(sctx mongo.SessionContext) error { + sr := ms.getCol(ColRts).FindOne(sctx, bson.M{"tenant": tenant, "id": id}) + decodeErr := sr.Decode(routeProfile) + if errors.Is(decodeErr, mongo.ErrNoDocuments) { + return utils.ErrNotFound } - return nil + return decodeErr }) - return + return routeProfile, err } -func (ms *MongoStorage) SetRouteProfileDrv(r *RouteProfile) (err error) { - return ms.query(func(sctx mongo.SessionContext) (err error) { - _, err = ms.getCol(ColRts).UpdateOne(sctx, bson.M{"tenant": r.Tenant, "id": r.ID}, +func (ms *MongoStorage) SetRouteProfileDrv(r *RouteProfile) error { + return ms.query(func(sctx mongo.SessionContext) error { + _, err := ms.getCol(ColRts).UpdateOne(sctx, bson.M{"tenant": r.Tenant, "id": r.ID}, bson.M{"$set": r}, options.Update().SetUpsert(true), ) @@ -1685,8 +1644,8 @@ func (ms *MongoStorage) SetRouteProfileDrv(r *RouteProfile) (err error) { }) } -func (ms *MongoStorage) RemoveRouteProfileDrv(tenant, id string) (err error) { - return ms.query(func(sctx mongo.SessionContext) (err error) { +func (ms *MongoStorage) RemoveRouteProfileDrv(tenant, id string) error { + return ms.query(func(sctx mongo.SessionContext) error { dr, err := ms.getCol(ColRts).DeleteOne(sctx, bson.M{"tenant": tenant, "id": id}) if dr.DeletedCount == 0 { return utils.ErrNotFound @@ -1695,25 +1654,22 @@ func (ms *MongoStorage) RemoveRouteProfileDrv(tenant, id string) (err error) { }) } -func (ms *MongoStorage) GetAttributeProfileDrv(tenant, id string) (r *AttributeProfile, err error) { - r = new(AttributeProfile) - err = ms.query(func(sctx mongo.SessionContext) (err error) { - cur := ms.getCol(ColAttr).FindOne(sctx, bson.M{"tenant": tenant, "id": id}) - if err := cur.Decode(r); err != nil { - r = nil - if err == mongo.ErrNoDocuments { - return utils.ErrNotFound - } - return err +func (ms *MongoStorage) GetAttributeProfileDrv(tenant, id string) (*AttributeProfile, error) { + attrProfile := new(AttributeProfile) + err := ms.query(func(sctx mongo.SessionContext) error { + sr := ms.getCol(ColAttr).FindOne(sctx, bson.M{"tenant": tenant, "id": id}) + decodeErr := sr.Decode(attrProfile) + if errors.Is(decodeErr, mongo.ErrNoDocuments) { + return utils.ErrNotFound } - return nil + return decodeErr }) - return + return attrProfile, err } -func (ms *MongoStorage) SetAttributeProfileDrv(r *AttributeProfile) (err error) { - return ms.query(func(sctx mongo.SessionContext) (err error) { - _, err = ms.getCol(ColAttr).UpdateOne(sctx, bson.M{"tenant": r.Tenant, "id": r.ID}, +func (ms *MongoStorage) SetAttributeProfileDrv(r *AttributeProfile) error { + return ms.query(func(sctx mongo.SessionContext) error { + _, err := ms.getCol(ColAttr).UpdateOne(sctx, bson.M{"tenant": r.Tenant, "id": r.ID}, bson.M{"$set": r}, options.Update().SetUpsert(true), ) @@ -1721,8 +1677,8 @@ func (ms *MongoStorage) SetAttributeProfileDrv(r *AttributeProfile) (err error) }) } -func (ms *MongoStorage) RemoveAttributeProfileDrv(tenant, id string) (err error) { - return ms.query(func(sctx mongo.SessionContext) (err error) { +func (ms *MongoStorage) RemoveAttributeProfileDrv(tenant, id string) error { + return ms.query(func(sctx mongo.SessionContext) error { dr, err := ms.getCol(ColAttr).DeleteOne(sctx, bson.M{"tenant": tenant, "id": id}) if dr.DeletedCount == 0 { return utils.ErrNotFound @@ -1731,25 +1687,22 @@ func (ms *MongoStorage) RemoveAttributeProfileDrv(tenant, id string) (err error) }) } -func (ms *MongoStorage) GetChargerProfileDrv(tenant, id string) (r *ChargerProfile, err error) { - r = new(ChargerProfile) - err = ms.query(func(sctx mongo.SessionContext) (err error) { - cur := ms.getCol(ColCpp).FindOne(sctx, bson.M{"tenant": tenant, "id": id}) - if err := cur.Decode(r); err != nil { - r = nil - if err == mongo.ErrNoDocuments { - return utils.ErrNotFound - } - return err +func (ms *MongoStorage) GetChargerProfileDrv(tenant, id string) (*ChargerProfile, error) { + chargerProfile := new(ChargerProfile) + err := ms.query(func(sctx mongo.SessionContext) error { + sr := ms.getCol(ColCpp).FindOne(sctx, bson.M{"tenant": tenant, "id": id}) + decodeErr := sr.Decode(chargerProfile) + if errors.Is(decodeErr, mongo.ErrNoDocuments) { + return utils.ErrNotFound } - return nil + return decodeErr }) - return + return chargerProfile, err } -func (ms *MongoStorage) SetChargerProfileDrv(r *ChargerProfile) (err error) { - return ms.query(func(sctx mongo.SessionContext) (err error) { - _, err = ms.getCol(ColCpp).UpdateOne(sctx, bson.M{"tenant": r.Tenant, "id": r.ID}, +func (ms *MongoStorage) SetChargerProfileDrv(r *ChargerProfile) error { + return ms.query(func(sctx mongo.SessionContext) error { + _, err := ms.getCol(ColCpp).UpdateOne(sctx, bson.M{"tenant": r.Tenant, "id": r.ID}, bson.M{"$set": r}, options.Update().SetUpsert(true), ) @@ -1757,8 +1710,8 @@ func (ms *MongoStorage) SetChargerProfileDrv(r *ChargerProfile) (err error) { }) } -func (ms *MongoStorage) RemoveChargerProfileDrv(tenant, id string) (err error) { - return ms.query(func(sctx mongo.SessionContext) (err error) { +func (ms *MongoStorage) RemoveChargerProfileDrv(tenant, id string) error { + return ms.query(func(sctx mongo.SessionContext) error { dr, err := ms.getCol(ColCpp).DeleteOne(sctx, bson.M{"tenant": tenant, "id": id}) if dr.DeletedCount == 0 { return utils.ErrNotFound @@ -1767,25 +1720,22 @@ func (ms *MongoStorage) RemoveChargerProfileDrv(tenant, id string) (err error) { }) } -func (ms *MongoStorage) GetDispatcherProfileDrv(tenant, id string) (r *DispatcherProfile, err error) { - r = new(DispatcherProfile) - err = ms.query(func(sctx mongo.SessionContext) (err error) { - cur := ms.getCol(ColDpp).FindOne(sctx, bson.M{"tenant": tenant, "id": id}) - if err := cur.Decode(r); err != nil { - r = nil - if err == mongo.ErrNoDocuments { - return utils.ErrDSPProfileNotFound - } - return err +func (ms *MongoStorage) GetDispatcherProfileDrv(tenant, id string) (*DispatcherProfile, error) { + dspProfile := new(DispatcherProfile) + err := ms.query(func(sctx mongo.SessionContext) error { + sr := ms.getCol(ColDpp).FindOne(sctx, bson.M{"tenant": tenant, "id": id}) + decodeErr := sr.Decode(dspProfile) + if errors.Is(decodeErr, mongo.ErrNoDocuments) { + return utils.ErrDSPProfileNotFound } - return nil + return decodeErr }) - return + return dspProfile, err } -func (ms *MongoStorage) SetDispatcherProfileDrv(r *DispatcherProfile) (err error) { - return ms.query(func(sctx mongo.SessionContext) (err error) { - _, err = ms.getCol(ColDpp).UpdateOne(sctx, bson.M{"tenant": r.Tenant, "id": r.ID}, +func (ms *MongoStorage) SetDispatcherProfileDrv(r *DispatcherProfile) error { + return ms.query(func(sctx mongo.SessionContext) error { + _, err := ms.getCol(ColDpp).UpdateOne(sctx, bson.M{"tenant": r.Tenant, "id": r.ID}, bson.M{"$set": r}, options.Update().SetUpsert(true), ) @@ -1793,8 +1743,8 @@ func (ms *MongoStorage) SetDispatcherProfileDrv(r *DispatcherProfile) (err error }) } -func (ms *MongoStorage) RemoveDispatcherProfileDrv(tenant, id string) (err error) { - return ms.query(func(sctx mongo.SessionContext) (err error) { +func (ms *MongoStorage) RemoveDispatcherProfileDrv(tenant, id string) error { + return ms.query(func(sctx mongo.SessionContext) error { dr, err := ms.getCol(ColDpp).DeleteOne(sctx, bson.M{"tenant": tenant, "id": id}) if dr.DeletedCount == 0 { return utils.ErrNotFound @@ -1803,25 +1753,22 @@ func (ms *MongoStorage) RemoveDispatcherProfileDrv(tenant, id string) (err error }) } -func (ms *MongoStorage) GetDispatcherHostDrv(tenant, id string) (r *DispatcherHost, err error) { - r = new(DispatcherHost) - err = ms.query(func(sctx mongo.SessionContext) (err error) { - cur := ms.getCol(ColDph).FindOne(sctx, bson.M{"tenant": tenant, "id": id}) - if err := cur.Decode(r); err != nil { - r = nil - if err == mongo.ErrNoDocuments { - return utils.ErrDSPHostNotFound - } - return err +func (ms *MongoStorage) GetDispatcherHostDrv(tenant, id string) (*DispatcherHost, error) { + dspHost := new(DispatcherHost) + err := ms.query(func(sctx mongo.SessionContext) error { + sr := ms.getCol(ColDph).FindOne(sctx, bson.M{"tenant": tenant, "id": id}) + decodeErr := sr.Decode(dspHost) + if errors.Is(decodeErr, mongo.ErrNoDocuments) { + return utils.ErrDSPHostNotFound } - return nil + return decodeErr }) - return + return dspHost, err } -func (ms *MongoStorage) SetDispatcherHostDrv(r *DispatcherHost) (err error) { - return ms.query(func(sctx mongo.SessionContext) (err error) { - _, err = ms.getCol(ColDph).UpdateOne(sctx, bson.M{"tenant": r.Tenant, "id": r.ID}, +func (ms *MongoStorage) SetDispatcherHostDrv(r *DispatcherHost) error { + return ms.query(func(sctx mongo.SessionContext) error { + _, err := ms.getCol(ColDph).UpdateOne(sctx, bson.M{"tenant": r.Tenant, "id": r.ID}, bson.M{"$set": r}, options.Update().SetUpsert(true), ) @@ -1829,8 +1776,8 @@ func (ms *MongoStorage) SetDispatcherHostDrv(r *DispatcherHost) (err error) { }) } -func (ms *MongoStorage) RemoveDispatcherHostDrv(tenant, id string) (err error) { - return ms.query(func(sctx mongo.SessionContext) (err error) { +func (ms *MongoStorage) RemoveDispatcherHostDrv(tenant, id string) error { + return ms.query(func(sctx mongo.SessionContext) error { dr, err := ms.getCol(ColDph).DeleteOne(sctx, bson.M{"tenant": tenant, "id": id}) if dr.DeletedCount == 0 { return utils.ErrNotFound @@ -1839,43 +1786,43 @@ func (ms *MongoStorage) RemoveDispatcherHostDrv(tenant, id string) (err error) { }) } -func (ms *MongoStorage) GetItemLoadIDsDrv(itemIDPrefix string) (loadIDs map[string]int64, err error) { +func (ms *MongoStorage) GetItemLoadIDsDrv(itemIDPrefix string) (map[string]int64, error) { fop := options.FindOne() if itemIDPrefix != "" { fop.SetProjection(bson.M{itemIDPrefix: 1, "_id": 0}) } else { fop.SetProjection(bson.M{"_id": 0}) } - if err = ms.query(func(sctx mongo.SessionContext) (err error) { - cur := ms.getCol(ColLID).FindOne(sctx, bson.D{}, fop) - if err := cur.Decode(&loadIDs); err != nil { - if err == mongo.ErrNoDocuments { - return utils.ErrNotFound - } - return err + loadIDs := make(map[string]int64) + err := ms.query(func(sctx mongo.SessionContext) error { + sr := ms.getCol(ColLID).FindOne(sctx, bson.D{}, fop) + decodeErr := sr.Decode(&loadIDs) + if errors.Is(decodeErr, mongo.ErrNoDocuments) { + return utils.ErrNotFound } - return nil - }); err != nil { + return decodeErr + }) + if err != nil { return nil, err } if len(loadIDs) == 0 { return nil, utils.ErrNotFound } - return + return loadIDs, nil } -func (ms *MongoStorage) SetLoadIDsDrv(loadIDs map[string]int64) (err error) { - return ms.query(func(sctx mongo.SessionContext) (err error) { - _, err = ms.getCol(ColLID).UpdateOne(sctx, bson.D{}, bson.M{"$set": loadIDs}, +func (ms *MongoStorage) SetLoadIDsDrv(loadIDs map[string]int64) error { + return ms.query(func(sctx mongo.SessionContext) error { + _, err := ms.getCol(ColLID).UpdateOne(sctx, bson.D{}, bson.M{"$set": loadIDs}, options.Update().SetUpsert(true), ) return err }) } -func (ms *MongoStorage) RemoveLoadIDsDrv() (err error) { - return ms.query(func(sctx mongo.SessionContext) (err error) { - _, err = ms.getCol(ColLID).DeleteMany(sctx, bson.M{}) +func (ms *MongoStorage) RemoveLoadIDsDrv() error { + return ms.query(func(sctx mongo.SessionContext) error { + _, err := ms.getCol(ColLID).DeleteMany(sctx, bson.M{}) return err }) } @@ -1883,7 +1830,7 @@ func (ms *MongoStorage) RemoveLoadIDsDrv() (err error) { // GetIndexesDrv retrieves Indexes from dataDB // the key is the tenant of the item or in case of context dependent profiles is a concatenatedKey between tenant and context // id is used as a concatenated key in case of filterIndexes the id will be filterType:fieldName:fieldVal -func (ms *MongoStorage) GetIndexesDrv(idxItmType, tntCtx, idxKey string) (indexes map[string]utils.StringSet, err error) { +func (ms *MongoStorage) GetIndexesDrv(idxItmType, tntCtx, idxKey string) (map[string]utils.StringSet, error) { type result struct { Key string Value []string @@ -1896,19 +1843,26 @@ func (ms *MongoStorage) GetIndexesDrv(idxItmType, tntCtx, idxKey string) (indexe for _, character := range []string{".", "*"} { dbKey = strings.Replace(dbKey, character, `\`+character, strings.Count(dbKey, character)) } - //inside bson.RegEx add carrot to match the prefix (optimization) - q = bson.M{"key": bsonx.Regex("^"+dbKey, utils.EmptyString)} + // For optimization, use a caret (^) in the regex pattern. + q = bson.M{"key": primitive.Regex{Pattern: "^" + dbKey}} } - indexes = make(map[string]utils.StringSet) - if err = ms.query(func(sctx mongo.SessionContext) (err error) { - cur, err := ms.getCol(ColIndx).Find(sctx, q) - if err != nil { - return err + indexes := make(map[string]utils.StringSet) + err := ms.query(func(sctx mongo.SessionContext) (qryErr error) { + cur, qryErr := ms.getCol(ColIndx).Find(sctx, q) + if qryErr != nil { + return qryErr } + defer func() { + closeErr := cur.Close(sctx) + if closeErr != nil && qryErr == nil { + qryErr = closeErr + } + }() for cur.Next(sctx) { var elem result - if err := cur.Decode(&elem); err != nil { - return err + qryErr = cur.Decode(&elem) + if qryErr != nil { + return qryErr } if len(elem.Value) == 0 { continue @@ -1916,8 +1870,9 @@ func (ms *MongoStorage) GetIndexesDrv(idxItmType, tntCtx, idxKey string) (indexe indexKey := strings.TrimPrefix(elem.Key, utils.CacheInstanceToPrefix[idxItmType]+tntCtx+utils.ConcatenatedKeySep) indexes[indexKey] = utils.NewStringSet(elem.Value) } - return cur.Close(sctx) - }); err != nil { + return cur.Err() + }) + if err != nil { return nil, err } if len(indexes) == 0 { @@ -1929,7 +1884,7 @@ func (ms *MongoStorage) GetIndexesDrv(idxItmType, tntCtx, idxKey string) (indexe // SetIndexesDrv stores Indexes into DataDB // the key is the tenant of the item or in case of context dependent profiles is a concatenatedKey between tenant and context func (ms *MongoStorage) SetIndexesDrv(idxItmType, tntCtx string, - indexes map[string]utils.StringSet, commit bool, transactionID string) (err error) { + indexes map[string]utils.StringSet, commit bool, transactionID string) error { originKey := utils.CacheInstanceToPrefix[idxItmType] + tntCtx dbKey := originKey if transactionID != utils.EmptyString { @@ -1938,43 +1893,44 @@ func (ms *MongoStorage) SetIndexesDrv(idxItmType, tntCtx string, if commit && transactionID != utils.EmptyString { regexKey := dbKey for _, character := range []string{".", "*"} { - regexKey = strings.Replace(regexKey, character, `\`+character, strings.Count(regexKey, character)) + regexKey = strings.ReplaceAll(regexKey, character, `\`+character) } - if err = ms.query(func(sctx mongo.SessionContext) (err error) { - var result []string - result, err = ms.getField3(sctx, ColIndx, regexKey, "key") + err := ms.query(func(sctx mongo.SessionContext) error { + result, qryErr := ms.getAllIndexKeys(sctx, regexKey) for _, key := range result { idxKey := strings.TrimPrefix(key, dbKey) - if _, err = ms.getCol(ColIndx).DeleteOne(sctx, - bson.M{"key": originKey + idxKey}); err != nil { //ensure we do not have dup - return err + if _, qryErr = ms.getCol(ColIndx).DeleteOne(sctx, + bson.M{"key": originKey + idxKey}); qryErr != nil { //ensure we do not have dup + return qryErr } - if _, err = ms.getCol(ColIndx).UpdateOne(sctx, bson.M{"key": key}, + if _, qryErr = ms.getCol(ColIndx).UpdateOne(sctx, bson.M{"key": key}, bson.M{"$set": bson.M{"key": originKey + idxKey}}, // only update the key - ); err != nil { - return err + ); qryErr != nil { + return qryErr } } return nil - }); err != nil { + }) + if err != nil { return err } } var lastErr error for idxKey, itmMp := range indexes { - if err = ms.query(func(sctx mongo.SessionContext) (err error) { + err := ms.query(func(sctx mongo.SessionContext) (qryErr error) { idxDbkey := utils.ConcatenatedKey(dbKey, idxKey) if len(itmMp) == 0 { // remove from DB if we set it with empty indexes - _, err = ms.getCol(ColIndx).DeleteOne(sctx, + _, qryErr = ms.getCol(ColIndx).DeleteOne(sctx, bson.M{"key": idxDbkey}) } else { - _, err = ms.getCol(ColIndx).UpdateOne(sctx, bson.M{"key": idxDbkey}, + _, qryErr = ms.getCol(ColIndx).UpdateOne(sctx, bson.M{"key": idxDbkey}, bson.M{"$set": bson.M{"key": idxDbkey, "value": itmMp.AsSlice()}}, options.Update().SetUpsert(true), ) } - return err - }); err != nil { + return qryErr + }) + if err != nil { lastErr = err } } @@ -1982,9 +1938,9 @@ func (ms *MongoStorage) SetIndexesDrv(idxItmType, tntCtx string, } // RemoveIndexesDrv removes the indexes -func (ms *MongoStorage) RemoveIndexesDrv(idxItmType, tntCtx, idxKey string) (err error) { +func (ms *MongoStorage) RemoveIndexesDrv(idxItmType, tntCtx, idxKey string) error { if len(idxKey) != 0 { - return ms.query(func(sctx mongo.SessionContext) (err error) { + return ms.query(func(sctx mongo.SessionContext) error { dr, err := ms.getCol(ColIndx).DeleteOne(sctx, bson.M{"key": utils.ConcatenatedKey(utils.CacheInstanceToPrefix[idxItmType]+tntCtx, idxKey)}) if dr.DeletedCount == 0 { @@ -1995,11 +1951,15 @@ func (ms *MongoStorage) RemoveIndexesDrv(idxItmType, tntCtx, idxKey string) (err } regexKey := utils.CacheInstanceToPrefix[idxItmType] + tntCtx for _, character := range []string{".", "*"} { - regexKey = strings.Replace(regexKey, character, `\`+character, strings.Count(regexKey, character)) + regexKey = strings.ReplaceAll(regexKey, character, `\`+character) } - //inside bson.RegEx add carrot to match the prefix (optimization) - return ms.query(func(sctx mongo.SessionContext) (err error) { - _, err = ms.getCol(ColIndx).DeleteMany(sctx, bson.M{"key": bsonx.Regex("^"+regexKey, utils.EmptyString)}) + // For optimization, use a caret (^) in the regex pattern. + return ms.query(func(sctx mongo.SessionContext) error { + _, err := ms.getCol(ColIndx).DeleteMany(sctx, bson.M{ + "key": primitive.Regex{ + Pattern: "^" + regexKey, + }, + }) return err }) } diff --git a/engine/storage_mongo_stordb.go b/engine/storage_mongo_stordb.go index 9e37f167a..02309c0ac 100644 --- a/engine/storage_mongo_stordb.go +++ b/engine/storage_mongo_stordb.go @@ -20,6 +20,7 @@ package engine import ( "context" + "errors" "fmt" "regexp" "strings" @@ -28,9 +29,9 @@ import ( "github.com/cgrates/cgrates/utils" "go.mongodb.org/mongo-driver/bson" + "go.mongodb.org/mongo-driver/bson/primitive" "go.mongodb.org/mongo-driver/mongo" "go.mongodb.org/mongo-driver/mongo/options" - "go.mongodb.org/mongo-driver/x/bsonx" ) func (ms *MongoStorage) GetTpIds(colName string) (tpids []string, err error) { @@ -79,7 +80,7 @@ func (ms *MongoStorage) GetTpIds(colName string) (tpids []string, err error) { return tpids, nil } -func (ms *MongoStorage) GetTpTableIds(tpid, table string, distinct utils.TPDistinctIds, +func (ms *MongoStorage) GetTpTableIds(tpid, table string, distinctIDs utils.TPDistinctIds, filter map[string]string, pag *utils.PaginatorWithSearch) ([]string, error) { findMap := bson.M{} if tpid != "" { @@ -95,11 +96,16 @@ func (ms *MongoStorage) GetTpTableIds(tpid, table string, distinct utils.TPDisti if pag != nil { if pag.Search != "" { var searchItems []bson.M - for _, d := range distinct { - searchItems = append(searchItems, bson.M{d: bsonx.Regex(".*"+regexp.QuoteMeta(pag.Search)+".*", "")}) + for _, distinctID := range distinctIDs { + searchItems = append(searchItems, + bson.M{ + distinctID: primitive.Regex{ + Pattern: ".*" + regexp.QuoteMeta(pag.Search) + ".*", + }, + }, + ) } - // findMap["$and"] = []bson.M{{"$or": searchItems}} //before - findMap["$or"] = searchItems // after + findMap["$or"] = searchItems } if pag.Paginator != nil { if pag.Limit != nil { @@ -112,32 +118,31 @@ func (ms *MongoStorage) GetTpTableIds(tpid, table string, distinct utils.TPDisti } selectors := bson.M{"_id": 0} - for i, d := range distinct { - if d == "tag" { // convert the tag used in SQL into id used here - distinct[i] = "id" + for i, distinctID := range distinctIDs { + if distinctID == "tag" { // convert the tag used in SQL into id used here + distinctIDs[i] = "id" } - selectors[distinct[i]] = 1 + selectors[distinctIDs[i]] = 1 } fop.SetProjection(selectors) distinctIds := make(utils.StringMap) - if err := ms.query(func(sctx mongo.SessionContext) (err error) { - cur, err := ms.getCol(table).Find(sctx, findMap, fop) - if err != nil { - return err + err := ms.query(func(sctx mongo.SessionContext) (qryErr error) { + cur, qryErr := ms.getCol(table).Find(sctx, findMap, fop) + if qryErr != nil { + return qryErr } for cur.Next(sctx) { - var elem bson.D - err := cur.Decode(&elem) + var item bson.M + err := cur.Decode(&item) if err != nil { return err } - item := elem.Map() var id string - last := len(distinct) - 1 - for i, d := range distinct { - if distinctValue, ok := item[d]; ok { + last := len(distinctIDs) - 1 + for i, distinctID := range distinctIDs { + if distinctValue, ok := item[distinctID]; ok { id += distinctValue.(string) } if i < last { @@ -147,7 +152,8 @@ func (ms *MongoStorage) GetTpTableIds(tpid, table string, distinct utils.TPDisti distinctIds[id] = true } return cur.Close(sctx) - }); err != nil { + }) + if err != nil { return nil, err } return distinctIds.Slice(), nil @@ -158,26 +164,26 @@ func (ms *MongoStorage) GetTPTimings(tpid, id string) ([]*utils.ApierTPTiming, e if id != "" { filter["id"] = id } - var results []*utils.ApierTPTiming - err := ms.query(func(sctx mongo.SessionContext) (err error) { - cur, err := ms.getCol(utils.TBLTPTimings).Find(sctx, filter) - if err != nil { - return err + var tpTimings []*utils.ApierTPTiming + err := ms.query(func(sctx mongo.SessionContext) (qryErr error) { + cur, qryErr := ms.getCol(utils.TBLTPTimings).Find(sctx, filter) + if qryErr != nil { + return qryErr } for cur.Next(sctx) { var el utils.ApierTPTiming - err := cur.Decode(&el) - if err != nil { - return err + qryErr = cur.Decode(&el) + if qryErr != nil { + return qryErr } - results = append(results, &el) + tpTimings = append(tpTimings, &el) } - if len(results) == 0 { + if len(tpTimings) == 0 { return utils.ErrNotFound } return cur.Close(sctx) }) - return results, err + return tpTimings, err } func (ms *MongoStorage) GetTPDestinations(tpid, id string) ([]*utils.TPDestination, error) { @@ -185,26 +191,26 @@ func (ms *MongoStorage) GetTPDestinations(tpid, id string) ([]*utils.TPDestinati if id != "" { filter["id"] = id } - var results []*utils.TPDestination - err := ms.query(func(sctx mongo.SessionContext) (err error) { - cur, err := ms.getCol(utils.TBLTPDestinations).Find(sctx, filter) - if err != nil { - return err + var tpDestinations []*utils.TPDestination + err := ms.query(func(sctx mongo.SessionContext) (qryErr error) { + cur, qryErr := ms.getCol(utils.TBLTPDestinations).Find(sctx, filter) + if qryErr != nil { + return qryErr } for cur.Next(sctx) { var el utils.TPDestination - err := cur.Decode(&el) - if err != nil { - return err + qryErr = cur.Decode(&el) + if qryErr != nil { + return qryErr } - results = append(results, &el) + tpDestinations = append(tpDestinations, &el) } - if len(results) == 0 { + if len(tpDestinations) == 0 { return utils.ErrNotFound } return cur.Close(sctx) }) - return results, err + return tpDestinations, err } func (ms *MongoStorage) GetTPRates(tpid, id string) ([]*utils.TPRateRALs, error) { @@ -212,11 +218,11 @@ func (ms *MongoStorage) GetTPRates(tpid, id string) ([]*utils.TPRateRALs, error) if id != "" { filter["id"] = id } - var results []*utils.TPRateRALs - err := ms.query(func(sctx mongo.SessionContext) (err error) { - cur, err := ms.getCol(utils.TBLTPRates).Find(sctx, filter) - if err != nil { - return err + var tpRates []*utils.TPRateRALs + err := ms.query(func(sctx mongo.SessionContext) (qryErr error) { + cur, qryErr := ms.getCol(utils.TBLTPRates).Find(sctx, filter) + if qryErr != nil { + return qryErr } for cur.Next(sctx) { var el utils.TPRateRALs @@ -227,14 +233,14 @@ func (ms *MongoStorage) GetTPRates(tpid, id string) ([]*utils.TPRateRALs, error) for _, rs := range el.RateSlots { rs.SetDurations() } - results = append(results, &el) + tpRates = append(tpRates, &el) } - if len(results) == 0 { + if len(tpRates) == 0 { return utils.ErrNotFound } return cur.Close(sctx) }) - return results, err + return tpRates, err } func (ms *MongoStorage) GetTPDestinationRates(tpid, id string, pag *utils.Paginator) ([]*utils.TPDestinationRate, error) { @@ -242,7 +248,7 @@ func (ms *MongoStorage) GetTPDestinationRates(tpid, id string, pag *utils.Pagina if id != "" { filter["id"] = id } - var results []*utils.TPDestinationRate + var tpDstRates []*utils.TPDestinationRate fop := options.Find() if pag != nil { if pag.Limit != nil { @@ -252,25 +258,25 @@ func (ms *MongoStorage) GetTPDestinationRates(tpid, id string, pag *utils.Pagina fop = fop.SetSkip(int64(*pag.Offset)) } } - err := ms.query(func(sctx mongo.SessionContext) (err error) { - cur, err := ms.getCol(utils.TBLTPDestinationRates).Find(sctx, filter, fop) - if err != nil { - return err + err := ms.query(func(sctx mongo.SessionContext) (qryErr error) { + cur, qryErr := ms.getCol(utils.TBLTPDestinationRates).Find(sctx, filter, fop) + if qryErr != nil { + return qryErr } for cur.Next(sctx) { var el utils.TPDestinationRate - err := cur.Decode(&el) - if err != nil { - return err + qryErr = cur.Decode(&el) + if qryErr != nil { + return qryErr } - results = append(results, &el) + tpDstRates = append(tpDstRates, &el) } - if len(results) == 0 { + if len(tpDstRates) == 0 { return utils.ErrNotFound } return cur.Close(sctx) }) - return results, err + return tpDstRates, err } func (ms *MongoStorage) GetTPRatingPlans(tpid, id string, pag *utils.Paginator) ([]*utils.TPRatingPlan, error) { @@ -278,7 +284,7 @@ func (ms *MongoStorage) GetTPRatingPlans(tpid, id string, pag *utils.Paginator) if id != "" { filter["id"] = id } - var results []*utils.TPRatingPlan + var tpRatingPlans []*utils.TPRatingPlan fop := options.Find() if pag != nil { if pag.Limit != nil { @@ -288,25 +294,25 @@ func (ms *MongoStorage) GetTPRatingPlans(tpid, id string, pag *utils.Paginator) fop = fop.SetSkip(int64(*pag.Offset)) } } - err := ms.query(func(sctx mongo.SessionContext) (err error) { - cur, err := ms.getCol(utils.TBLTPRatingPlans).Find(sctx, filter, fop) - if err != nil { - return err + err := ms.query(func(sctx mongo.SessionContext) (qryErr error) { + cur, qryErr := ms.getCol(utils.TBLTPRatingPlans).Find(sctx, filter, fop) + if qryErr != nil { + return qryErr } for cur.Next(sctx) { var el utils.TPRatingPlan - err := cur.Decode(&el) - if err != nil { - return err + qryErr = cur.Decode(&el) + if qryErr != nil { + return qryErr } - results = append(results, &el) + tpRatingPlans = append(tpRatingPlans, &el) } - if len(results) == 0 { + if len(tpRatingPlans) == 0 { return utils.ErrNotFound } return cur.Close(sctx) }) - return results, err + return tpRatingPlans, err } func (ms *MongoStorage) GetTPRatingProfiles(tp *utils.TPRatingProfile) ([]*utils.TPRatingProfile, error) { @@ -323,26 +329,26 @@ func (ms *MongoStorage) GetTPRatingProfiles(tp *utils.TPRatingProfile) ([]*utils if tp.LoadId != "" { filter["loadid"] = tp.LoadId } - var results []*utils.TPRatingProfile - err := ms.query(func(sctx mongo.SessionContext) (err error) { - cur, err := ms.getCol(utils.TBLTPRatingProfiles).Find(sctx, filter) - if err != nil { - return err + var tpRatingProfiles []*utils.TPRatingProfile + err := ms.query(func(sctx mongo.SessionContext) (qryErr error) { + cur, qryErr := ms.getCol(utils.TBLTPRatingProfiles).Find(sctx, filter) + if qryErr != nil { + return qryErr } for cur.Next(sctx) { var el utils.TPRatingProfile - err := cur.Decode(&el) - if err != nil { - return err + qryErr = cur.Decode(&el) + if qryErr != nil { + return qryErr } - results = append(results, &el) + tpRatingProfiles = append(tpRatingProfiles, &el) } - if len(results) == 0 { + if len(tpRatingProfiles) == 0 { return utils.ErrNotFound } return cur.Close(sctx) }) - return results, err + return tpRatingProfiles, err } func (ms *MongoStorage) GetTPSharedGroups(tpid, id string) ([]*utils.TPSharedGroups, error) { @@ -551,7 +557,7 @@ func (ms *MongoStorage) GetTPAccountActions(tp *utils.TPAccountActions) ([]*util } func (ms *MongoStorage) RemTpData(table, tpid string, args map[string]string) error { - if len(table) == 0 { // Remove tpid out of all tables + if table == utils.EmptyString { // Remove tpid out of all tables return ms.query(func(sctx mongo.SessionContext) error { col, err := ms.DB().ListCollections(sctx, bson.D{}, options.ListCollections().SetNameOnly(true)) if err != nil { @@ -890,7 +896,9 @@ func (ms *MongoStorage) GetSMCosts(cgrid, runid, originHost, originIDPrefix stri filter[OriginHostLow] = originHost } if originIDPrefix != "" { - filter[OriginIDLow] = bsonx.Regex(fmt.Sprintf("^%s", originIDPrefix), "") + filter[OriginIDLow] = primitive.Regex{ + Pattern: "^" + originIDPrefix, + } } err = ms.query(func(sctx mongo.SessionContext) (err error) { cur, err := ms.getCol(utils.SessionCostsTBL).Find(sctx, filter) @@ -934,7 +942,7 @@ func (ms *MongoStorage) RemoveSMCosts(qryFltr *utils.SMCostFilter) error { func (ms *MongoStorage) SetCDR(cdr *CDR, allowUpdate bool) error { if cdr.OrderID == 0 { - cdr.OrderID = ms.cnter.Next() + cdr.OrderID = ms.counter.Next() } return ms.query(func(sctx mongo.SessionContext) (err error) { if allowUpdate { @@ -984,20 +992,22 @@ func (ms *MongoStorage) cleanEmptyFilters(filters bson.M) { } // _, err := col(ColCDRs).UpdateAll(bson.M{CGRIDLow: bson.M{"$in": cgrIds}}, bson.M{"$set": bson.M{"deleted_at": time.Now()}}) -func (ms *MongoStorage) GetCDRs(qryFltr *utils.CDRsFilter, remove bool) ([]*CDR, int64, error) { +func (ms *MongoStorage) GetCDRs(qryFltr *utils.CDRsFilter, remove bool) (cdrs []*CDR, n int64, err error) { var minUsage, maxUsage *time.Duration - if len(qryFltr.MinUsage) != 0 { - if parsed, err := utils.ParseDurationWithNanosecs(qryFltr.MinUsage); err != nil { + if qryFltr.MinUsage != utils.EmptyString { + parsedDur, err := utils.ParseDurationWithNanosecs(qryFltr.MinUsage) + if err != nil { return nil, 0, err } else { - minUsage = &parsed + minUsage = &parsedDur } } - if len(qryFltr.MaxUsage) != 0 { - if parsed, err := utils.ParseDurationWithNanosecs(qryFltr.MaxUsage); err != nil { + if qryFltr.MaxUsage != utils.EmptyString { + parsedDur, err := utils.ParseDurationWithNanosecs(qryFltr.MaxUsage) + if err != nil { return nil, 0, err } else { - maxUsage = &parsed + maxUsage = &parsedDur } } filters := bson.M{ @@ -1018,20 +1028,20 @@ func (ms *MongoStorage) GetCDRs(qryFltr *utils.CDRsFilter, remove bool) ([]*CDR, CreatedAtLow: bson.M{"$gte": qryFltr.CreatedAtStart, "$lt": qryFltr.CreatedAtEnd}, UpdatedAtLow: bson.M{"$gte": qryFltr.UpdatedAtStart, "$lt": qryFltr.UpdatedAtEnd}, UsageLow: bson.M{"$gte": minUsage, "$lt": maxUsage}, - //CostDetailsLow + "." + AccountLow: bson.M{"$in": qryFltr.RatedAccounts, "$nin": qryFltr.NotRatedAccounts}, - //CostDetailsLow + "." + SubjectLow: bson.M{"$in": qryFltr.RatedSubjects, "$nin": qryFltr.NotRatedSubjects}, + // CostDetailsLow + "." + AccountLow: bson.M{"$in": qryFltr.RatedAccounts, "$nin": qryFltr.NotRatedAccounts}, + // CostDetailsLow + "." + SubjectLow: bson.M{"$in": qryFltr.RatedSubjects, "$nin": qryFltr.NotRatedSubjects}, } - //file, _ := os.TempFile(os.TempDir(), "debug") - //file.WriteString(fmt.Sprintf("FILTER: %v\n", utils.ToIJSON(qryFltr))) - //file.WriteString(fmt.Sprintf("BEFORE: %v\n", utils.ToIJSON(filters))) + // file, _ := os.TempFile(os.TempDir(), "debug") + // file.WriteString(fmt.Sprintf("FILTER: %v\n", utils.ToIJSON(qryFltr))) + // file.WriteString(fmt.Sprintf("BEFORE: %v\n", utils.ToIJSON(filters))) ms.cleanEmptyFilters(filters) if len(qryFltr.DestinationPrefixes) != 0 { var regexpRule string for _, prefix := range qryFltr.DestinationPrefixes { - if len(prefix) == 0 { + if prefix == utils.EmptyString { continue } - if len(regexpRule) != 0 { + if regexpRule != utils.EmptyString { regexpRule += "|" } regexpRule += "^(" + regexp.QuoteMeta(prefix) + ")" @@ -1039,17 +1049,29 @@ func (ms *MongoStorage) GetCDRs(qryFltr *utils.CDRsFilter, remove bool) ([]*CDR, if _, hasIt := filters["$and"]; !hasIt { filters["$and"] = make([]bson.M, 0) } - filters["$and"] = append(filters["$and"].([]bson.M), bson.M{DestinationLow: bsonx.Regex(regexpRule, "")}) // $and gathers all rules not fitting top level query + // The "$and" operator is used to include additional query conditions that cannot be + // represented at the top level of the query. + filters["$and"] = append(filters["$and"].([]bson.M), bson.M{ + DestinationLow: primitive.Regex{ + Pattern: regexpRule, + }, + }) } if len(qryFltr.NotDestinationPrefixes) != 0 { if _, hasIt := filters["$and"]; !hasIt { filters["$and"] = make([]bson.M, 0) } for _, prefix := range qryFltr.NotDestinationPrefixes { - if len(prefix) == 0 { + if prefix == utils.EmptyString { continue } - filters["$and"] = append(filters["$and"].([]bson.M), bson.M{DestinationLow: bsonx.Regex("^(?!"+prefix+")", "")}) + filters["$and"] = append(filters["$and"].([]bson.M), + bson.M{ + DestinationLow: primitive.Regex{ + Pattern: "^(?!" + prefix + ")", + }, + }, + ) } } @@ -1094,19 +1116,18 @@ func (ms *MongoStorage) GetCDRs(qryFltr *utils.CDRsFilter, remove bool) ([]*CDR, filters[CostLow] = bson.M{"$lt": *qryFltr.MaxCost} } } - //file.WriteString(fmt.Sprintf("AFTER: %v\n", utils.ToIJSON(filters))) - //file.Close() + // file.WriteString(fmt.Sprintf("AFTER: %v\n", utils.ToIJSON(filters))) + // file.Close() if remove { - var chgd int64 - err := ms.query(func(sctx mongo.SessionContext) (err error) { - dr, err := ms.getCol(ColCDRs).DeleteMany(sctx, filters) - if err != nil { - return err + err := ms.query(func(sctx mongo.SessionContext) (qryErr error) { + dr, qryErr := ms.getCol(ColCDRs).DeleteMany(sctx, filters) + if qryErr != nil { + return qryErr } - chgd = dr.DeletedCount - return err + n = dr.DeletedCount + return qryErr }) - return nil, chgd, err + return nil, n, err } fop := options.Find() cop := options.Count() @@ -1139,26 +1160,26 @@ func (ms *MongoStorage) GetCDRs(qryFltr *utils.CDRsFilter, remove bool) ([]*CDR, case utils.Cost: orderVal += "cost" default: - return nil, 0, fmt.Errorf("Invalid value : %s", separateVals[0]) + return nil, 0, fmt.Errorf("invalid value : %s", separateVals[0]) } fop = fop.SetSort(bson.M{orderVal: ordVal}) } if qryFltr.Count { var cnt int64 - if err := ms.query(func(sctx mongo.SessionContext) (err error) { - cnt, err = ms.getCol(ColCDRs).CountDocuments(sctx, filters, cop) - return err - }); err != nil { + err := ms.query(func(sctx mongo.SessionContext) (qryErr error) { + cnt, qryErr = ms.getCol(ColCDRs).CountDocuments(sctx, filters, cop) + return qryErr + }) + if err != nil { return nil, 0, err } return nil, cnt, nil } // Execute query - var cdrs []*CDR - err := ms.query(func(sctx mongo.SessionContext) (err error) { - cur, err := ms.getCol(ColCDRs).Find(sctx, filters, fop) - if err != nil { - return err + err = ms.query(func(sctx mongo.SessionContext) (qryErr error) { + cur, qryErr := ms.getCol(ColCDRs).Find(sctx, filters, fop) + if qryErr != nil { + return qryErr } for cur.Next(sctx) { cdr := CDR{} @@ -1532,37 +1553,40 @@ func (ms *MongoStorage) SetTPDispatcherHosts(tpDPPs []*utils.TPDispatcherHost) ( }) } -func (ms *MongoStorage) GetVersions(itm string) (vrs Versions, err error) { +func (ms *MongoStorage) GetVersions(itm string) (Versions, error) { fop := options.FindOne() if itm != "" { fop.SetProjection(bson.M{itm: 1, "_id": 0}) } else { fop.SetProjection(bson.M{"_id": 0}) } - if err = ms.query(func(sctx mongo.SessionContext) (err error) { - cur := ms.getCol(ColVer).FindOne(sctx, bson.D{}, fop) - if err := cur.Decode(&vrs); err != nil { - if err == mongo.ErrNoDocuments { - return utils.ErrNotFound - } - return err + var vrs Versions + err := ms.query(func(sctx mongo.SessionContext) (err error) { + sr := ms.getCol(ColVer).FindOne(sctx, bson.D{}, fop) + decodeErr := sr.Decode(&vrs) + if errors.Is(decodeErr, mongo.ErrNoDocuments) { + return utils.ErrNotFound } - return nil - }); err != nil { + return decodeErr + }) + if err != nil { return nil, err } if len(vrs) == 0 { return nil, utils.ErrNotFound } - return + return vrs, nil } -func (ms *MongoStorage) SetVersions(vrs Versions, overwrite bool) (err error) { +func (ms *MongoStorage) SetVersions(vrs Versions, overwrite bool) error { if overwrite { - ms.RemoveVersions(nil) + err := ms.RemoveVersions(nil) + if err != nil && !errors.Is(err, utils.ErrNotFound) { + return err + } } - return ms.query(func(sctx mongo.SessionContext) (err error) { - _, err = ms.getCol(ColVer).UpdateOne(sctx, bson.D{}, bson.M{"$set": vrs}, + return ms.query(func(sctx mongo.SessionContext) error { + _, err := ms.getCol(ColVer).UpdateOne(sctx, bson.D{}, bson.M{"$set": vrs}, options.Update().SetUpsert(true), ) return err @@ -1575,23 +1599,22 @@ func (ms *MongoStorage) SetVersions(vrs Versions, overwrite bool) (err error) { // _, err = col.Upsert(bson.M{}, bson.M{"$set": &vrs}) } -func (ms *MongoStorage) RemoveVersions(vrs Versions) (err error) { +func (ms *MongoStorage) RemoveVersions(vrs Versions) error { if len(vrs) == 0 { - return ms.query(func(sctx mongo.SessionContext) (err error) { - var dr *mongo.DeleteResult - dr, err = ms.getCol(ColVer).DeleteOne(sctx, bson.D{}) + return ms.query(func(sctx mongo.SessionContext) error { + dr, err := ms.getCol(ColVer).DeleteOne(sctx, bson.D{}) if err != nil { - return + return err } if dr.DeletedCount == 0 { return utils.ErrNotFound } - return + return nil }) } - return ms.query(func(sctx mongo.SessionContext) (err error) { + return ms.query(func(sctx mongo.SessionContext) error { for k := range vrs { - if _, err = ms.getCol(ColVer).UpdateOne(sctx, bson.D{}, bson.M{"$unset": bson.M{k: 1}}, + if _, err := ms.getCol(ColVer).UpdateOne(sctx, bson.D{}, bson.M{"$unset": bson.M{k: 1}}, options.Update().SetUpsert(true)); err != nil { return err } diff --git a/engine/storage_utils.go b/engine/storage_utils.go index f97b388f9..02f9cc508 100644 --- a/engine/storage_utils.go +++ b/engine/storage_utils.go @@ -20,6 +20,9 @@ package engine import ( "fmt" + "net" + "net/url" + "path" "strconv" "strings" "time" @@ -81,6 +84,24 @@ func NewStorDBConn(dbType, host, port, name, user, pass, marshaler string, return } +func buildURL(scheme, host, port, db, user, pass string) (*url.URL, error) { + u, err := url.Parse("//" + host) + if err != nil { + return nil, err + } + if port != "0" { + u.Host = net.JoinHostPort(u.Host, port) + } + if user != "" && pass != "" { + u.User = url.UserPassword(user, pass) + } + if db != "" { + u.Path = path.Join(u.Path, db) + } + u.Scheme = scheme + return u, nil +} + // SMCost stores one Cost coming from SM type SMCost struct { CGRID string diff --git a/go.mod b/go.mod index b128ce9e4..9a198b7a5 100644 --- a/go.mod +++ b/go.mod @@ -45,7 +45,7 @@ require ( github.com/peterh/liner v1.2.2 github.com/rabbitmq/amqp091-go v1.9.0 github.com/segmentio/kafka-go v0.4.44 - go.mongodb.org/mongo-driver v1.11.0 + go.mongodb.org/mongo-driver v1.12.1 golang.org/x/crypto v0.14.0 golang.org/x/net v0.17.0 golang.org/x/oauth2 v0.13.0 @@ -94,7 +94,6 @@ require ( github.com/jinzhu/now v1.1.5 // indirect github.com/jmespath/go-jmespath v0.4.0 // indirect github.com/klauspost/compress v1.17.1 // indirect - github.com/kr/pretty v0.2.1 // indirect github.com/lib/pq v1.8.0 // indirect github.com/mattn/go-runewidth v0.0.15 // indirect github.com/minio/highwayhash v1.0.2 // indirect @@ -105,7 +104,6 @@ require ( github.com/nats-io/nuid v1.0.1 // indirect github.com/philhofer/fwd v1.1.1 // indirect github.com/pierrec/lz4/v4 v4.1.18 // indirect - github.com/pkg/errors v0.9.1 // indirect github.com/rivo/uniseg v0.4.4 // indirect github.com/steveyen/gtreap v0.1.0 // indirect github.com/syndtr/goleveldb v1.0.0 // indirect diff --git a/go.sum b/go.sum index 91214e768..17d816eaf 100644 --- a/go.sum +++ b/go.sum @@ -253,11 +253,8 @@ github.com/kljensen/snowball v0.6.0/go.mod h1:27N7E8fVU5H68RlUmnWwZCfxgt4POBJfEN github.com/konsorten/go-windows-terminal-sequences v1.0.1/go.mod h1:T0+1ngSBFLxvqU3pZ+m/2kptfBszLMUkC4ZK/EgS/cQ= github.com/konsorten/go-windows-terminal-sequences v1.0.2/go.mod h1:T0+1ngSBFLxvqU3pZ+m/2kptfBszLMUkC4ZK/EgS/cQ= github.com/kr/pretty v0.1.0/go.mod h1:dAy3ld7l9f0ibDNOQOHHMYYIIbhfbHSm3C4ZsoJORNo= -github.com/kr/pretty v0.2.1 h1:Fmg33tUaq4/8ym9TJN1x7sLJnHVwhP33CNkpYV/7rwI= -github.com/kr/pretty v0.2.1/go.mod h1:ipq/a2n7PKx3OHsz4KJII5eveXtPO4qwEXGdVfWzfnI= github.com/kr/pty v1.1.1/go.mod h1:pFQYn66WHrOpPYNljwOMqo10TkYh1fy3cYio2l3bCsQ= github.com/kr/pty v1.1.8/go.mod h1:O1sed60cT9XZ5uDucP5qwvh+TE3NnUj51EiZO/lmSfw= -github.com/kr/text v0.1.0 h1:45sCR5RtlFHMR4UwH9sdQ5TC8v0qDQCHnXt+kaKSTVE= github.com/kr/text v0.1.0/go.mod h1:4Jbv+DJW3UT/LiOwJeYQe1efqtUx/iVham/4vfdArNI= github.com/lib/pq v1.0.0/go.mod h1:5WUZQaWbwv1U+lTReE5YruASi9Al49XbQIvNi/34Woo= github.com/lib/pq v1.1.0/go.mod h1:5WUZQaWbwv1U+lTReE5YruASi9Al49XbQIvNi/34Woo= @@ -366,7 +363,6 @@ github.com/stretchr/testify v1.2.2/go.mod h1:a8OnRcib4nhh0OaRAV+Yts87kKdq0PP7pXf github.com/stretchr/testify v1.3.0/go.mod h1:M5WIy9Dh21IEIfnGCwXGc5bZfKNJtfHm1UVUgZn+9EI= github.com/stretchr/testify v1.4.0/go.mod h1:j7eGeouHqKxXV5pUuKE4zz7dFj8WfuZ+81PSLYec5m4= github.com/stretchr/testify v1.5.1/go.mod h1:5W2xD1RspED5o8YsWQXVCued0rvSQ+mT+I5cxcmMvtA= -github.com/stretchr/testify v1.6.1/go.mod h1:6Fq8oRcR53rry900zMqJjRRixrwX3KX962/h/Wwjteg= github.com/stretchr/testify v1.7.1/go.mod h1:6Fq8oRcR53rry900zMqJjRRixrwX3KX962/h/Wwjteg= github.com/stretchr/testify v1.8.0/go.mod h1:yNjHg4UonilssWZ8iaSj1OCr/vHnekPRkoO+kdMU+MU= github.com/stretchr/testify v1.8.1/go.mod h1:w2LPCIKwWwSfY2zedu0+kehJoqGctiVI29o6fzry7u4= @@ -377,8 +373,6 @@ github.com/syndtr/goleveldb v1.0.0/go.mod h1:ZVVdQEZoIme9iO1Ch2Jdy24qqXrMMOU6lpP github.com/tebeka/snowball v0.4.2/go.mod h1:4IfL14h1lvwZcp1sfXuuc7/7yCsvVffTWxWxCLfFpYg= github.com/tecbot/gorocksdb v0.0.0-20191217155057-f0fad39f321c h1:g+WoO5jjkqGAzHWCjJB1zZfXPIAaDpzXIEJ0eS6B5Ok= github.com/tecbot/gorocksdb v0.0.0-20191217155057-f0fad39f321c/go.mod h1:ahpPrc7HpcfEWDQRZEmnXMzHY03mLDYMCxeDzy46i+8= -github.com/tidwall/pretty v1.0.0 h1:HsD+QiTn7sK6flMKIvNmpqz1qrpP3Ps6jOKIKMooyg4= -github.com/tidwall/pretty v1.0.0/go.mod h1:XNkn88O1ChpSDQmQeStsy+sBenx6DDtFZJxhVysOjyk= github.com/tinylib/msgp v1.1.0/go.mod h1:+d+yLhGm8mzTaHzB+wgMYrodPfmZrzkirds8fDWklFE= github.com/tinylib/msgp v1.1.2/go.mod h1:+d+yLhGm8mzTaHzB+wgMYrodPfmZrzkirds8fDWklFE= github.com/tinylib/msgp v1.1.5 h1:2gXmtWueD2HefZHQe1QOy9HVzmFrLOVvsXwXBQ0ayy0= @@ -390,10 +384,8 @@ github.com/willf/bitset v1.1.11 h1:N7Z7E9UvjW+sGsEl7k/SJrvY2reP1A07MrGuCjIOjRE= github.com/willf/bitset v1.1.11/go.mod h1:83CECat5yLh5zVOf4P1ErAgKA5UDvKtgyUABdr3+MjI= github.com/xdg-go/pbkdf2 v1.0.0 h1:Su7DPu48wXMwC3bs7MCNG+z4FhcyEuz5dlvchbq0B0c= github.com/xdg-go/pbkdf2 v1.0.0/go.mod h1:jrpuAogTd400dnrH08LKmI/xc1MbPOebTwRqcT5RDeI= -github.com/xdg-go/scram v1.1.1/go.mod h1:RaEWvsqvNKKvBPvcKeFjrG2cJqOkHTiyTpzz23ni57g= github.com/xdg-go/scram v1.1.2 h1:FHX5I5B4i4hKRVRBCFRxq1iQRej7WO3hhBuJf+UUySY= github.com/xdg-go/scram v1.1.2/go.mod h1:RT/sEzTbU5y00aCK8UOx6R7YryM0iF1N2MOmC3kKLN4= -github.com/xdg-go/stringprep v1.0.3/go.mod h1:W3f5j4i+9rC0kuIEJL0ky1VpHXQU3ocBgklLGvcBnW8= github.com/xdg-go/stringprep v1.0.4 h1:XLI/Ng3O1Atzq0oBs3TWm+5ZVgkq2aqdlvP9JtoZ6c8= github.com/xdg-go/stringprep v1.0.4/go.mod h1:mPGuuIYwz7CmR2bT9j4GbQqutWS1zV24gijq1dTyGkM= github.com/xordataexchange/crypt v0.0.3-0.20170626215501-b2862e3d0a77/go.mod h1:aYKd//L2LvnjZzWKhF00oedf4jCCReLcmhLdhm1A27Q= @@ -404,8 +396,8 @@ github.com/yuin/goldmark v1.4.13/go.mod h1:6yULJ656Px+3vBD8DxQVa3kxgyrAnzto9xy5t github.com/zenazn/goji v0.9.0/go.mod h1:7S9M489iMyHBNxwZnk9/EHS098H4/F6TATF2mIxtB1Q= go.etcd.io/bbolt v1.3.5 h1:XAzx9gjCb0Rxj7EoqcClPD1d5ZBxZJk0jbuoPHenBt0= go.etcd.io/bbolt v1.3.5/go.mod h1:G5EMThwa9y8QZGBClrRx5EY+Yw9kAhnjy3bSjsnlVTQ= -go.mongodb.org/mongo-driver v1.11.0 h1:FZKhBSTydeuffHj9CBjXlR8vQLee1cQyTWYPA6/tqiE= -go.mongodb.org/mongo-driver v1.11.0/go.mod h1:s7p5vEtfbeR1gYi6pnj3c3/urpbLv2T5Sfd6Rp2HBB8= +go.mongodb.org/mongo-driver v1.12.1 h1:nLkghSU8fQNaK7oUmDhQFsnrtcoNy7Z6LVFKsEecqgE= +go.mongodb.org/mongo-driver v1.12.1/go.mod h1:/rGBTebI3XYboVmgz+Wv3Bcbl3aD0QF9zl6kDDw18rQ= go.opencensus.io v0.24.0 h1:y73uSU6J157QMP2kn2r30vwW1A2W2WFwSCGnAVxeaD0= go.opencensus.io v0.24.0/go.mod h1:vNK8G9p7aAivkbmorf4v+7Hgx+Zs0yY+0fOtgBfjQKo= go.uber.org/atomic v1.3.2/go.mod h1:gD2HeocX3+yG+ygLZcrzQJaqmWj9AIm7n08wl/qW/PE= @@ -476,7 +468,6 @@ golang.org/x/sync v0.0.0-20181108010431-42b317875d0f/go.mod h1:RxMgew5VJxzue5/jJ golang.org/x/sync v0.0.0-20190423024810-112230192c58/go.mod h1:RxMgew5VJxzue5/jJTE5uejpjVlOe/izrB70Jof72aM= golang.org/x/sync v0.0.0-20200625203802-6e8e738ad208/go.mod h1:RxMgew5VJxzue5/jJTE5uejpjVlOe/izrB70Jof72aM= golang.org/x/sync v0.0.0-20201020160332-67f06af15bc9/go.mod h1:RxMgew5VJxzue5/jJTE5uejpjVlOe/izrB70Jof72aM= -golang.org/x/sync v0.0.0-20210220032951-036812b2e83c/go.mod h1:RxMgew5VJxzue5/jJTE5uejpjVlOe/izrB70Jof72aM= golang.org/x/sync v0.0.0-20220722155255-886fb9371eb4/go.mod h1:RxMgew5VJxzue5/jJTE5uejpjVlOe/izrB70Jof72aM= golang.org/x/sync v0.1.0/go.mod h1:RxMgew5VJxzue5/jJTE5uejpjVlOe/izrB70Jof72aM= golang.org/x/sync v0.4.0 h1:zxkM55ReGkDlKSM+Fu41A+zmbZuaPVbGMzvvdUPznYQ=