diff --git a/engine/storage_mongo_datadb.go b/engine/storage_mongo_datadb.go index 9f5c8d41f..d94d57c3f 100644 --- a/engine/storage_mongo_datadb.go +++ b/engine/storage_mongo_datadb.go @@ -22,6 +22,7 @@ import ( "bytes" "compress/zlib" "context" + "errors" "fmt" "io" "reflect" @@ -38,14 +39,12 @@ import ( "go.mongodb.org/mongo-driver/bson" "go.mongodb.org/mongo-driver/bson/bsoncodec" "go.mongodb.org/mongo-driver/bson/bsonrw" - "go.mongodb.org/mongo-driver/bson/bsontype" "go.mongodb.org/mongo-driver/bson/primitive" "go.mongodb.org/mongo-driver/mongo" "go.mongodb.org/mongo-driver/mongo/options" - "go.mongodb.org/mongo-driver/x/bsonx" ) -// Mongo collections names +// Collection names in MongoDB. const ( ColDst = "destinations" ColRds = "reverse_destinations" @@ -100,48 +99,43 @@ var ( DestinationLow = strings.ToLower(utils.Destination) CostLow = strings.ToLower(utils.Cost) CostSourceLow = strings.ToLower(utils.CostSource) - - tTime = reflect.TypeOf(time.Time{}) - decimalType = reflect.TypeOf(utils.Decimal{}) ) -func TimeDecodeValue1(dc bsoncodec.DecodeContext, vr bsonrw.ValueReader, val reflect.Value) error { - if vr.Type() != bsontype.DateTime { - return fmt.Errorf("cannot decode %v into a time.Time", vr.Type()) +func decimalEncoder(ec bsoncodec.EncodeContext, vw bsonrw.ValueWriter, val reflect.Value) error { + decimalType := reflect.TypeOf(utils.Decimal{}) + + // All encoder implementations should check that val is valid and is of + // the correct type before proceeding. + if !val.IsValid() || val.Type() != decimalType { + return bsoncodec.ValueEncoderError{ + Name: "decimalEncoder", + Types: []reflect.Type{decimalType}, + Received: val, + } } - dt, err := vr.ReadDateTime() + sls, err := val.Interface().(utils.Decimal).MarshalText() if err != nil { return err } - if !val.CanSet() || val.Type() != tTime { - return bsoncodec.ValueDecoderError{Name: "TimeDecodeValue", Types: []reflect.Type{tTime}, Received: val} - } - val.Set(reflect.ValueOf(time.Unix(dt/1000, dt%1000*1000000).UTC())) - return nil -} - -func DecimalEncoder(ec bsoncodec.EncodeContext, vw bsonrw.ValueWriter, val reflect.Value) error { - if val.Kind() != reflect.Struct { - return bsoncodec.ValueEncoderError{Name: "DecimalEncoder", Kinds: []reflect.Kind{reflect.Struct}, Received: val} - } - d, ok := val.Interface().(utils.Decimal) - if !ok { - return fmt.Errorf("cannot cast <%+v> to ", val.Interface()) - } - sls, err := d.MarshalText() - if err != nil { - return err - } return vw.WriteBinary(sls) } -func DecimalDecoder(ec bsoncodec.DecodeContext, vw bsonrw.ValueReader, val reflect.Value) error { - if !val.CanSet() || val.Type() != decimalType { - return bsoncodec.ValueEncoderError{Name: "DecimalDecoder", Kinds: []reflect.Kind{reflect.Struct}, Received: val} +func decimalDecoder(dc bsoncodec.DecodeContext, vr bsonrw.ValueReader, val reflect.Value) error { + decimalType := reflect.TypeOf(utils.Decimal{}) + + // All decoder implementations should check that val is valid, settable, + // and is of the correct kind before proceeding. + if !val.IsValid() || !val.CanSet() || val.Type() != decimalType { + return bsoncodec.ValueDecoderError{ + Name: "decimalDecoder", + Types: []reflect.Type{decimalType}, + Received: val, + } } - data, _, err := vw.ReadBinary() + + data, _, err := vr.ReadBinary() if err != nil { return err } @@ -153,95 +147,88 @@ func DecimalDecoder(ec bsoncodec.DecodeContext, vw bsonrw.ValueReader, val refle return nil } -// NewMongoStorage givese new mongo driver -func NewMongoStorage(host, port, db, user, pass, mrshlerStr, storageType string, - cdrsIndexes []string, ttl time.Duration) (ms *MongoStorage, err error) { - url := host - if port != "0" { - url += ":" + port - } - if user != "" && pass != "" { - url = fmt.Sprintf("%s:%s@%s", user, pass, url) - } - var dbName string - if db != "" { - url += "/" + db - dbName = strings.Split(db, "?")[0] // remove extra info after ? - } - ctx := context.Background() - url = "mongodb://" + url - reg := bson.NewRegistryBuilder() - reg.RegisterDecoder(tTime, bsoncodec.ValueDecoderFunc(TimeDecodeValue1)) - reg.RegisterTypeEncoder(decimalType, bsoncodec.ValueEncoderFunc(DecimalEncoder)) - reg.RegisterTypeDecoder(decimalType, bsoncodec.ValueDecoderFunc(DecimalDecoder)) - opt := options.Client(). - ApplyURI(url). - SetRegistry(reg.Build()). - SetServerSelectionTimeout(ttl). - SetRetryWrites(false) // set this option to false because as default it is on true - - client, err := mongo.NewClient(opt) - // client, err := mongo.NewClient(url) - +// NewMongoStorage initializes a new MongoDB storage instance with provided connection parameters and settings. +// Returns an error if the setup fails. +func NewMongoStorage(host, port, db, user, pass, mrshlerStr string, storageType string, + cdrsIndexes []string, ttl time.Duration) (*MongoStorage, error) { + url, err := buildURL("mongodb", host, port, db, user, pass) if err != nil { return nil, err } - err = client.Connect(ctx) - if err != nil { - return nil, err - } - mrshler, err := NewMarshaler(mrshlerStr) - if err != nil { - return nil, err - } - - ms = &MongoStorage{ - client: client, - ctx: ctx, + mongoStorage := &MongoStorage{ + ctx: context.TODO(), ctxTTL: ttl, - db: dbName, - storageType: storageType, - ms: mrshler, cdrsIndexes: cdrsIndexes, + storageType: storageType, + counter: utils.NewCounter(time.Now().UnixNano(), 0), + } + reg := bson.NewRegistry() + decimalType := reflect.TypeOf(utils.Decimal{}) + reg.RegisterTypeEncoder(decimalType, bsoncodec.ValueEncoderFunc(decimalEncoder)) + reg.RegisterTypeDecoder(decimalType, bsoncodec.ValueDecoderFunc(decimalDecoder)) + // serverAPI := options.ServerAPI(options.ServerAPIVersion1).SetStrict(true).SetDeprecationErrors(true) + opts := options.Client(). + ApplyURI(url.String()). + SetRegistry(reg). + SetServerSelectionTimeout(mongoStorage.ctxTTL). + SetRetryWrites(false) // default is true + // SetServerAPIOptions(serverAPI) + + // Create a new client and connect to the server + mongoStorage.client, err = mongo.Connect(mongoStorage.ctx, opts) + if err != nil { + return nil, err } - if err = ms.query(func(sctx mongo.SessionContext) error { - cols, err := ms.client.Database(dbName).ListCollectionNames(sctx, bson.D{}) + mongoStorage.ms, err = NewMarshaler(mrshlerStr) + if err != nil { + return nil, err + } + if db != "" { + // Populate ms.db with the url path after trimming everything after '?'. + mongoStorage.db = strings.Split(db, "?")[0] + } + + err = mongoStorage.query(func(sctx mongo.SessionContext) error { + // Create indexes only if the database is empty or only the version table is present. + cols, err := mongoStorage.client.Database(mongoStorage.db). + ListCollectionNames(sctx, bson.D{}) if err != nil { return err } empty := true - for _, col := range cols { // create indexes only if database is empty or only version table is present + for _, col := range cols { if col != ColVer { empty = false break } } if empty { - return ms.EnsureIndexes() + return mongoStorage.EnsureIndexes() } return nil - }); err != nil { + }) + + if err != nil { return nil, err } - ms.cnter = utils.NewCounter(time.Now().UnixNano(), 0) - return + return mongoStorage, nil } -// MongoStorage struct for new mongo driver +// MongoStorage represents a storage interface for the new MongoDB driver. type MongoStorage struct { client *mongo.Client ctx context.Context ctxTTL time.Duration ctxTTLMutex sync.RWMutex // used for TTL reload db string - storageType string // datadb, stordb + storageType string // DataDB/StorDB ms Marshaler cdrsIndexes []string - cnter *utils.Counter + counter *utils.Counter } -func (ms *MongoStorage) query(argfunc func(ctx mongo.SessionContext) error) (err error) { +func (ms *MongoStorage) query(argfunc func(ctx mongo.SessionContext) error) error { ms.ctxTTLMutex.RLock() ctxSession, ctxSessionCancel := context.WithTimeout(ms.ctx, ms.ctxTTL) ms.ctxTTLMutex.RUnlock() @@ -249,12 +236,12 @@ func (ms *MongoStorage) query(argfunc func(ctx mongo.SessionContext) error) (err return ms.client.UseSession(ctxSession, argfunc) } -// IsDataDB returns if the storeage is used for DataDb +// IsDataDB returns whether or not the storage is used for DataDB. func (ms *MongoStorage) IsDataDB() bool { return ms.storageType == utils.DataDB } -// SetTTL set the context TTL used for queries (is thread safe) +// SetTTL sets the context TTL used for queries (Thread-safe). func (ms *MongoStorage) SetTTL(ttl time.Duration) { ms.ctxTTLMutex.Lock() ms.ctxTTL = ttl @@ -264,14 +251,14 @@ func (ms *MongoStorage) SetTTL(ttl time.Duration) { func (ms *MongoStorage) enusureIndex(colName string, uniq bool, keys ...string) error { return ms.query(func(sctx mongo.SessionContext) error { col := ms.getCol(colName) - io := options.Index().SetUnique(uniq) + indexOptions := options.Index().SetUnique(uniq) doc := make(bson.D, 0) for _, k := range keys { doc = append(doc, bson.E{Key: k, Value: 1}) } _, err := col.Indexes().CreateOne(sctx, mongo.IndexModel{ Keys: doc, - Options: io, + Options: indexOptions, }) return err }) @@ -289,38 +276,36 @@ func (ms *MongoStorage) getCol(col string) *mongo.Collection { return ms.client.Database(ms.db).Collection(col) } -// GetContext returns the context used for the current DB +// GetContext returns the context used for the current database. func (ms *MongoStorage) GetContext() context.Context { return ms.ctx } func isNotFound(err error) bool { - de, ok := err.(mongo.CommandError) - if !ok { // if still can't converted to the mongo.CommandError check if error do not contains message - return strings.Contains(err.Error(), "ns not found") + var de *mongo.CommandError + + if errors.As(err, &de) { + return de.Code == 26 || de.Message == "ns not found" } - return de.Code == 26 || de.Message == "ns not found" + + // If the error cannot be converted to mongo.CommandError + // check if the error message contains "ns not found" + return strings.Contains(err.Error(), "ns not found") } -func (ms *MongoStorage) ensureIndexesForCol(col string) (err error) { // exported for migrator - if err = ms.dropAllIndexesForCol(col); err != nil && !isNotFound(err) { // make sure you do not have indexes - return +func (ms *MongoStorage) ensureIndexesForCol(col string) error { // exported for migrator + err := ms.dropAllIndexesForCol(col) + if err != nil && !isNotFound(err) { // make sure you do not have indexes + return err } - err = nil switch col { case ColAct, ColApl, ColAAp, ColAtr, ColRpl, ColDst, ColRds, ColLht, ColIndx: - if err = ms.enusureIndex(col, true, "key"); err != nil { - return - } + err = ms.enusureIndex(col, true, "key") case ColRsP, ColRes, ColSqs, ColSqp, ColTps, ColThs, ColRts, ColAttr, ColFlt, ColCpp, ColDpp, ColDph: - if err = ms.enusureIndex(col, true, "tenant", "id"); err != nil { - return - } + err = ms.enusureIndex(col, true, "tenant", "id") case ColRpf, ColShg, ColAcc: - if err = ms.enusureIndex(col, true, "id"); err != nil { - return - } - //StorDB + err = ms.enusureIndex(col, true, "id") + // StorDB case utils.TBLTPTimings, utils.TBLTPDestinations, utils.TBLTPDestinationRates, utils.TBLTPRatingPlans, utils.TBLTPSharedGroups, utils.TBLTPActions, @@ -328,105 +313,89 @@ func (ms *MongoStorage) ensureIndexesForCol(col string) (err error) { // exporte utils.TBLTPStats, utils.TBLTPResources, utils.TBLTPDispatchers, utils.TBLTPDispatcherHosts, utils.TBLTPChargers, utils.TBLTPRoutes, utils.TBLTPThresholds: - if err = ms.enusureIndex(col, true, "tpid", "id"); err != nil { - return - } + err = ms.enusureIndex(col, true, "tpid", "id") case utils.TBLTPRatingProfiles: - if err = ms.enusureIndex(col, true, "tpid", "tenant", - "category", "subject", "loadid"); err != nil { - return + err = ms.enusureIndex(col, true, "tpid", "tenant", + "category", "subject", "loadid") + case utils.SessionCostsTBL: + err = ms.enusureIndex(col, true, CGRIDLow, RunIDLow) + if err == nil { + err = ms.enusureIndex(col, false, OriginHostLow, OriginIDLow) + } + if err == nil { + err = ms.enusureIndex(col, false, RunIDLow, OriginIDLow) } case utils.CDRsTBL: - if err = ms.enusureIndex(col, true, CGRIDLow, RunIDLow, - OriginIDLow); err != nil { - return - } - for _, idxKey := range ms.cdrsIndexes { - if err = ms.enusureIndex(col, false, idxKey); err != nil { - return + err = ms.enusureIndex(col, true, CGRIDLow, RunIDLow, + OriginIDLow) + if err == nil { + for _, idxKey := range ms.cdrsIndexes { + err = ms.enusureIndex(col, false, idxKey) + if err != nil { + break + } } } - case utils.SessionCostsTBL: - if err = ms.enusureIndex(col, true, CGRIDLow, - RunIDLow); err != nil { - return - } - if err = ms.enusureIndex(col, false, OriginHostLow, - OriginIDLow); err != nil { - return - } - if err = ms.enusureIndex(col, false, RunIDLow, - OriginIDLow); err != nil { - return - } } - return + return err } -// EnsureIndexes creates db indexes -func (ms *MongoStorage) EnsureIndexes(cols ...string) (err error) { - if len(cols) != 0 { - for _, col := range cols { - if err = ms.ensureIndexesForCol(col); err != nil { - return +// EnsureIndexes creates database indexes for the specified collections. +func (ms *MongoStorage) EnsureIndexes(cols ...string) error { + if len(cols) == 0 { + if ms.IsDataDB() { + cols = []string{ + ColAct, ColApl, ColAAp, ColAtr, ColRpl, ColDst, ColRds, ColLht, ColIndx, + ColRsP, ColRes, ColSqs, ColSqp, ColTps, ColThs, ColRts, ColAttr, ColFlt, ColCpp, + ColDpp, ColRpf, ColShg, ColAcc, } - } - return - } - if ms.storageType == utils.DataDB { - for _, col := range []string{ColAct, ColApl, ColAAp, ColAtr, - ColRpl, ColDst, ColRds, ColLht, ColIndx, ColRsP, ColRes, ColSqs, ColSqp, - ColTps, ColThs, ColRts, ColAttr, ColFlt, ColCpp, ColDpp, - ColRpf, ColShg, ColAcc} { - if err = ms.ensureIndexesForCol(col); err != nil { - return + } else { + cols = []string{ + utils.TBLTPTimings, utils.TBLTPDestinations, utils.TBLTPDestinationRates, + utils.TBLTPRatingPlans, utils.TBLTPSharedGroups, utils.TBLTPActions, utils.TBLTPActionPlans, + utils.TBLTPActionTriggers, utils.TBLTPStats, utils.TBLTPResources, utils.TBLTPRatingProfiles, + utils.CDRsTBL, utils.SessionCostsTBL, } } } - if ms.storageType == utils.StorDB { - for _, col := range []string{utils.TBLTPTimings, utils.TBLTPDestinations, - utils.TBLTPDestinationRates, utils.TBLTPRatingPlans, - utils.TBLTPSharedGroups, utils.TBLTPActions, - utils.TBLTPActionPlans, utils.TBLTPActionTriggers, - utils.TBLTPStats, utils.TBLTPResources, - utils.TBLTPRatingProfiles, utils.CDRsTBL, utils.SessionCostsTBL} { - if err = ms.ensureIndexesForCol(col); err != nil { - return - } + for _, col := range cols { + if err := ms.ensureIndexesForCol(col); err != nil { + return err } } - return + return nil } -// Close disconnects the client +// Close disconnects the MongoDB client. func (ms *MongoStorage) Close() { if err := ms.client.Disconnect(ms.ctx); err != nil { utils.Logger.Err(fmt.Sprintf(" Error on disconnect:%s", err)) } } -// Flush drops the datatable -func (ms *MongoStorage) Flush(ignore string) (err error) { +// Flush drops the datatable and recreates the indexes. +func (ms *MongoStorage) Flush(_ string) (err error) { return ms.query(func(sctx mongo.SessionContext) error { - if err = ms.client.Database(ms.db).Drop(sctx); err != nil { + err := ms.client.Database(ms.db).Drop(sctx) + if err != nil { return err } - return ms.EnsureIndexes() // recreate the indexes + return ms.EnsureIndexes() }) } -// DB returnes a database object +// DB returns the database object associated with the MongoDB client. func (ms *MongoStorage) DB() *mongo.Database { return ms.client.Database(ms.db) } -// SelectDatabase selects the database -func (ms *MongoStorage) SelectDatabase(dbName string) (err error) { +// SelectDatabase selects the specified database. +func (ms *MongoStorage) SelectDatabase(dbName string) error { ms.db = dbName - return + return nil } -func (ms *MongoStorage) RemoveKeysForPrefix(prefix string) (err error) { +func (ms *MongoStorage) RemoveKeysForPrefix(prefix string) error { var colName string switch prefix { case utils.DestinationPrefix: @@ -483,8 +452,8 @@ func (ms *MongoStorage) RemoveKeysForPrefix(prefix string) (err error) { }) } -// IsDBEmpty implementation -func (ms *MongoStorage) IsDBEmpty() (resp bool, err error) { +// IsDBEmpty checks if the database is empty by verifying if each collection is empty. +func (ms *MongoStorage) IsDBEmpty() (isEmpty bool, err error) { err = ms.query(func(sctx mongo.SessionContext) error { cols, err := ms.DB().ListCollectionNames(sctx, bson.D{}) if err != nil { @@ -494,26 +463,33 @@ func (ms *MongoStorage) IsDBEmpty() (resp bool, err error) { if col == utils.CDRsTBL { // ignore cdrs collection continue } - var count int64 - if count, err = ms.getCol(col).CountDocuments(sctx, bson.D{}, options.Count().SetLimit(1)); err != nil { // check if collection is empty so limit the count to 1 + count, err := ms.getCol(col).CountDocuments(sctx, bson.D{}, options.Count().SetLimit(1)) // limiting the count to 1 since we are only checking if the collection is empty + if err != nil { return err } if count != 0 { return nil } } - resp = true + isEmpty = true return nil }) - return resp, err + return isEmpty, err } -func (ms *MongoStorage) getField(sctx mongo.SessionContext, col, prefix, subject, field string) (result []string, err error) { - fieldResult := bson.D{} +func (ms *MongoStorage) getAllKeysMatchingField(sctx mongo.SessionContext, col, prefix, + subject, field string) (keys []string, err error) { + fieldResult := bson.M{} iter, err := ms.getCol(col).Find(sctx, - bson.M{field: bsonx.Regex(subject, "")}, + bson.M{ + field: primitive.Regex{ + Pattern: subject, + }, + }, options.Find().SetProjection( - bson.M{field: 1}, + bson.M{ + field: 1, + }, ), ) if err != nil { @@ -524,19 +500,22 @@ func (ms *MongoStorage) getField(sctx mongo.SessionContext, col, prefix, subject if err != nil { return } - result = append(result, prefix+fieldResult.Map()[field].(string)) + keys = append(keys, prefix+fieldResult[field].(string)) } - return result, iter.Close(sctx) + return keys, iter.Close(sctx) } -func (ms *MongoStorage) getField2(sctx mongo.SessionContext, col, prefix, subject string, tntID *utils.TenantID) (result []string, err error) { - idResult := struct{ Tenant, Id string }{} +func (ms *MongoStorage) getAllKeysMatchingTenantID(sctx mongo.SessionContext, col, prefix, + subject string, tntID *utils.TenantID) (keys []string, err error) { + idResult := struct{ Tenant, ID string }{} elem := bson.M{} if tntID.Tenant != "" { elem["tenant"] = tntID.Tenant } if tntID.ID != "" { - elem["id"] = bsonx.Regex(subject, "") + elem["id"] = primitive.Regex{ + Pattern: subject, + } } iter, err := ms.getCol(col).Find(sctx, elem, options.Find().SetProjection(bson.M{"tenant": 1, "id": 1}), @@ -549,17 +528,21 @@ func (ms *MongoStorage) getField2(sctx mongo.SessionContext, col, prefix, subjec if err != nil { return } - result = append(result, prefix+utils.ConcatenatedKey(idResult.Tenant, idResult.Id)) + keys = append(keys, prefix+utils.ConcatenatedKey(idResult.Tenant, idResult.ID)) } - return result, iter.Close(sctx) + return keys, iter.Close(sctx) } -func (ms *MongoStorage) getField3(sctx mongo.SessionContext, col, prefix, field string) (result []string, err error) { - fieldResult := bson.D{} - iter, err := ms.getCol(col).Find(sctx, - bson.M{field: bsonx.Regex(fmt.Sprintf("^%s", prefix), "")}, +func (ms *MongoStorage) getAllIndexKeys(sctx mongo.SessionContext, prefix string) (keys []string, err error) { + fieldResult := bson.M{} + iter, err := ms.getCol(ColIndx).Find(sctx, + bson.M{ + "key": primitive.Regex{ + Pattern: "^" + prefix, + }, + }, options.Find().SetProjection( - bson.M{field: 1}, + bson.M{"key": 1}, ), ) if err != nil { @@ -570,96 +553,98 @@ func (ms *MongoStorage) getField3(sctx mongo.SessionContext, col, prefix, field if err != nil { return } - result = append(result, fieldResult.Map()[field].(string)) + keys = append(keys, fieldResult["key"].(string)) } - return result, iter.Close(sctx) + return keys, iter.Close(sctx) } -// GetKeysForPrefix implementation -func (ms *MongoStorage) GetKeysForPrefix(prefix string) (result []string, err error) { +// GetKeysForPrefix retrieves keys matching the specified prefix across different categories. +func (ms *MongoStorage) GetKeysForPrefix(prefix string) (keys []string, err error) { var category, subject string keyLen := len(utils.DestinationPrefix) if len(prefix) < keyLen { - return nil, fmt.Errorf("unsupported prefix in GetKeysForPrefix: %q", prefix) + return nil, fmt.Errorf("unsupported prefix in GetKeysForPrefix: %s", prefix) } category = prefix[:keyLen] // prefix length tntID := utils.NewTenantID(prefix[keyLen:]) - subject = fmt.Sprintf("^%s", prefix[keyLen:]) // old way, no tenant support - err = ms.query(func(sctx mongo.SessionContext) (err error) { + subject = "^" + prefix[keyLen:] // old way, no tenant support + err = ms.query(func(sctx mongo.SessionContext) error { + var qryErr error switch category { case utils.DestinationPrefix: - result, err = ms.getField(sctx, ColDst, utils.DestinationPrefix, subject, "key") + keys, qryErr = ms.getAllKeysMatchingField(sctx, ColDst, utils.DestinationPrefix, subject, "key") case utils.ReverseDestinationPrefix: - result, err = ms.getField(sctx, ColRds, utils.ReverseDestinationPrefix, subject, "key") + keys, qryErr = ms.getAllKeysMatchingField(sctx, ColRds, utils.ReverseDestinationPrefix, subject, "key") case utils.RatingPlanPrefix: - result, err = ms.getField(sctx, ColRpl, utils.RatingPlanPrefix, subject, "key") + keys, qryErr = ms.getAllKeysMatchingField(sctx, ColRpl, utils.RatingPlanPrefix, subject, "key") case utils.RatingProfilePrefix: if strings.HasPrefix(prefix[keyLen:], utils.MetaOut) { - subject = fmt.Sprintf("^\\%s", prefix[keyLen:]) // rewrite the id cause it start with * from `*out` + // Rewrite the id as it starts with '*' (from "*out"). + subject = "^\\" + prefix[keyLen:] } - result, err = ms.getField(sctx, ColRpf, utils.RatingProfilePrefix, subject, "id") + keys, qryErr = ms.getAllKeysMatchingField(sctx, ColRpf, utils.RatingProfilePrefix, subject, "id") case utils.ActionPrefix: - result, err = ms.getField(sctx, ColAct, utils.ActionPrefix, subject, "key") + keys, qryErr = ms.getAllKeysMatchingField(sctx, ColAct, utils.ActionPrefix, subject, "key") case utils.ActionPlanPrefix: - result, err = ms.getField(sctx, ColApl, utils.ActionPlanPrefix, subject, "key") + keys, qryErr = ms.getAllKeysMatchingField(sctx, ColApl, utils.ActionPlanPrefix, subject, "key") case utils.ActionTriggerPrefix: - result, err = ms.getField(sctx, ColAtr, utils.ActionTriggerPrefix, subject, "key") + keys, qryErr = ms.getAllKeysMatchingField(sctx, ColAtr, utils.ActionTriggerPrefix, subject, "key") case utils.SharedGroupPrefix: - result, err = ms.getField(sctx, ColShg, utils.SharedGroupPrefix, subject, "id") + keys, qryErr = ms.getAllKeysMatchingField(sctx, ColShg, utils.SharedGroupPrefix, subject, "id") case utils.AccountPrefix: - result, err = ms.getField(sctx, ColAcc, utils.AccountPrefix, subject, "id") + keys, qryErr = ms.getAllKeysMatchingField(sctx, ColAcc, utils.AccountPrefix, subject, "id") case utils.ResourceProfilesPrefix: - result, err = ms.getField2(sctx, ColRsP, utils.ResourceProfilesPrefix, subject, tntID) + keys, qryErr = ms.getAllKeysMatchingTenantID(sctx, ColRsP, utils.ResourceProfilesPrefix, subject, tntID) case utils.ResourcesPrefix: - result, err = ms.getField2(sctx, ColRes, utils.ResourcesPrefix, subject, tntID) + keys, qryErr = ms.getAllKeysMatchingTenantID(sctx, ColRes, utils.ResourcesPrefix, subject, tntID) case utils.StatQueuePrefix: - result, err = ms.getField2(sctx, ColSqs, utils.StatQueuePrefix, subject, tntID) + keys, qryErr = ms.getAllKeysMatchingTenantID(sctx, ColSqs, utils.StatQueuePrefix, subject, tntID) case utils.StatQueueProfilePrefix: - result, err = ms.getField2(sctx, ColSqp, utils.StatQueueProfilePrefix, subject, tntID) + keys, qryErr = ms.getAllKeysMatchingTenantID(sctx, ColSqp, utils.StatQueueProfilePrefix, subject, tntID) case utils.AccountActionPlansPrefix: - result, err = ms.getField(sctx, ColAAp, utils.AccountActionPlansPrefix, subject, "key") + keys, qryErr = ms.getAllKeysMatchingField(sctx, ColAAp, utils.AccountActionPlansPrefix, subject, "key") case utils.TimingsPrefix: - result, err = ms.getField(sctx, ColTmg, utils.TimingsPrefix, subject, "id") + keys, qryErr = ms.getAllKeysMatchingField(sctx, ColTmg, utils.TimingsPrefix, subject, "id") case utils.FilterPrefix: - result, err = ms.getField2(sctx, ColFlt, utils.FilterPrefix, subject, tntID) + keys, qryErr = ms.getAllKeysMatchingTenantID(sctx, ColFlt, utils.FilterPrefix, subject, tntID) case utils.ThresholdPrefix: - result, err = ms.getField2(sctx, ColThs, utils.ThresholdPrefix, subject, tntID) + keys, qryErr = ms.getAllKeysMatchingTenantID(sctx, ColThs, utils.ThresholdPrefix, subject, tntID) case utils.ThresholdProfilePrefix: - result, err = ms.getField2(sctx, ColTps, utils.ThresholdProfilePrefix, subject, tntID) + keys, qryErr = ms.getAllKeysMatchingTenantID(sctx, ColTps, utils.ThresholdProfilePrefix, subject, tntID) case utils.RouteProfilePrefix: - result, err = ms.getField2(sctx, ColRts, utils.RouteProfilePrefix, subject, tntID) + keys, qryErr = ms.getAllKeysMatchingTenantID(sctx, ColRts, utils.RouteProfilePrefix, subject, tntID) case utils.AttributeProfilePrefix: - result, err = ms.getField2(sctx, ColAttr, utils.AttributeProfilePrefix, subject, tntID) + keys, qryErr = ms.getAllKeysMatchingTenantID(sctx, ColAttr, utils.AttributeProfilePrefix, subject, tntID) case utils.ChargerProfilePrefix: - result, err = ms.getField2(sctx, ColCpp, utils.ChargerProfilePrefix, subject, tntID) + keys, qryErr = ms.getAllKeysMatchingTenantID(sctx, ColCpp, utils.ChargerProfilePrefix, subject, tntID) case utils.DispatcherProfilePrefix: - result, err = ms.getField2(sctx, ColDpp, utils.DispatcherProfilePrefix, subject, tntID) + keys, qryErr = ms.getAllKeysMatchingTenantID(sctx, ColDpp, utils.DispatcherProfilePrefix, subject, tntID) case utils.DispatcherHostPrefix: - result, err = ms.getField2(sctx, ColDph, utils.DispatcherHostPrefix, subject, tntID) + keys, qryErr = ms.getAllKeysMatchingTenantID(sctx, ColDph, utils.DispatcherHostPrefix, subject, tntID) case utils.AttributeFilterIndexes: - result, err = ms.getField3(sctx, ColIndx, utils.AttributeFilterIndexes, "key") + keys, qryErr = ms.getAllIndexKeys(sctx, utils.AttributeFilterIndexes) case utils.ResourceFilterIndexes: - result, err = ms.getField3(sctx, ColIndx, utils.ResourceFilterIndexes, "key") + keys, qryErr = ms.getAllIndexKeys(sctx, utils.ResourceFilterIndexes) case utils.StatFilterIndexes: - result, err = ms.getField3(sctx, ColIndx, utils.StatFilterIndexes, "key") + keys, qryErr = ms.getAllIndexKeys(sctx, utils.StatFilterIndexes) case utils.ThresholdFilterIndexes: - result, err = ms.getField3(sctx, ColIndx, utils.ThresholdFilterIndexes, "key") + keys, qryErr = ms.getAllIndexKeys(sctx, utils.ThresholdFilterIndexes) case utils.RouteFilterIndexes: - result, err = ms.getField3(sctx, ColIndx, utils.RouteFilterIndexes, "key") + keys, qryErr = ms.getAllIndexKeys(sctx, utils.RouteFilterIndexes) case utils.ChargerFilterIndexes: - result, err = ms.getField3(sctx, ColIndx, utils.ChargerFilterIndexes, "key") + keys, qryErr = ms.getAllIndexKeys(sctx, utils.ChargerFilterIndexes) case utils.DispatcherFilterIndexes: - result, err = ms.getField3(sctx, ColIndx, utils.DispatcherFilterIndexes, "key") + keys, qryErr = ms.getAllIndexKeys(sctx, utils.DispatcherFilterIndexes) case utils.ActionPlanIndexes: - result, err = ms.getField3(sctx, ColIndx, utils.ActionPlanIndexes, "key") + keys, qryErr = ms.getAllIndexKeys(sctx, utils.ActionPlanIndexes) case utils.FilterIndexPrfx: - result, err = ms.getField3(sctx, ColIndx, utils.FilterIndexPrfx, "key") + keys, qryErr = ms.getAllIndexKeys(sctx, utils.FilterIndexPrfx) default: - err = fmt.Errorf("unsupported prefix in GetKeysForPrefix: %q", prefix) + qryErr = fmt.Errorf("unsupported prefix in GetKeysForPrefix: %s", prefix) } - return err + return qryErr }) - return + return keys, err } func (ms *MongoStorage) HasDataDrv(category, subject, tenant string) (has bool, err error) { @@ -711,21 +696,20 @@ func (ms *MongoStorage) HasDataDrv(category, subject, tenant string) (has bool, return has, err } -func (ms *MongoStorage) GetRatingPlanDrv(key string) (rp *RatingPlan, err error) { +func (ms *MongoStorage) GetRatingPlanDrv(key string) (*RatingPlan, error) { var kv struct { Key string Value []byte } - if err = ms.query(func(sctx mongo.SessionContext) (err error) { - cur := ms.getCol(ColRpl).FindOne(sctx, bson.M{"key": key}) - if err := cur.Decode(&kv); err != nil { - if err == mongo.ErrNoDocuments { - return utils.ErrNotFound - } - return err + err := ms.query(func(sctx mongo.SessionContext) (err error) { + sr := ms.getCol(ColRpl).FindOne(sctx, bson.M{"key": key}) + decodeErr := sr.Decode(&kv) + if errors.Is(decodeErr, mongo.ErrNoDocuments) { + return utils.ErrNotFound } - return nil - }); err != nil { + return decodeErr + }) + if err != nil { return nil, err } @@ -738,11 +722,13 @@ func (ms *MongoStorage) GetRatingPlanDrv(key string) (rp *RatingPlan, err error) if err != nil { return nil, err } - r.Close() - if err = ms.ms.Unmarshal(out, &rp); err != nil { + err = r.Close() + if err != nil { return nil, err } - return + var ratingPlan *RatingPlan + err = ms.ms.Unmarshal(out, &ratingPlan) + return ratingPlan, err } func (ms *MongoStorage) SetRatingPlanDrv(rp *RatingPlan) error { @@ -750,12 +736,19 @@ func (ms *MongoStorage) SetRatingPlanDrv(rp *RatingPlan) error { if err != nil { return err } + var b bytes.Buffer w := zlib.NewWriter(&b) - w.Write(result) - w.Close() - return ms.query(func(sctx mongo.SessionContext) (err error) { - _, err = ms.getCol(ColRpl).UpdateOne(sctx, bson.M{"key": rp.Id}, + _, err = w.Write(result) + if err != nil { + return err + } + err = w.Close() + if err != nil { + return err + } + return ms.query(func(sctx mongo.SessionContext) error { + _, err := ms.getCol(ColRpl).UpdateOne(sctx, bson.M{"key": rp.Id}, bson.M{"$set": struct { Key string Value []byte @@ -767,7 +760,7 @@ func (ms *MongoStorage) SetRatingPlanDrv(rp *RatingPlan) error { } func (ms *MongoStorage) RemoveRatingPlanDrv(key string) error { - return ms.query(func(sctx mongo.SessionContext) (err error) { + return ms.query(func(sctx mongo.SessionContext) error { dr, err := ms.getCol(ColRpl).DeleteMany(sctx, bson.M{"key": key}) if dr.DeletedCount == 0 { return utils.ErrNotFound @@ -776,25 +769,22 @@ func (ms *MongoStorage) RemoveRatingPlanDrv(key string) error { }) } -func (ms *MongoStorage) GetRatingProfileDrv(key string) (rp *RatingProfile, err error) { - rp = new(RatingProfile) - err = ms.query(func(sctx mongo.SessionContext) (err error) { - cur := ms.getCol(ColRpf).FindOne(sctx, bson.M{"id": key}) - if err := cur.Decode(rp); err != nil { - rp = nil - if err == mongo.ErrNoDocuments { - return utils.ErrNotFound - } - return err +func (ms *MongoStorage) GetRatingProfileDrv(key string) (*RatingProfile, error) { + rtProfile := new(RatingProfile) + err := ms.query(func(sctx mongo.SessionContext) error { + sr := ms.getCol(ColRpf).FindOne(sctx, bson.M{"id": key}) + decodeErr := sr.Decode(rtProfile) + if errors.Is(decodeErr, mongo.ErrNoDocuments) { + return utils.ErrNotFound } - return nil + return decodeErr }) - return + return rtProfile, err } -func (ms *MongoStorage) SetRatingProfileDrv(rp *RatingProfile) (err error) { - return ms.query(func(sctx mongo.SessionContext) (err error) { - _, err = ms.getCol(ColRpf).UpdateOne(sctx, bson.M{"id": rp.Id}, +func (ms *MongoStorage) SetRatingProfileDrv(rp *RatingProfile) error { + return ms.query(func(sctx mongo.SessionContext) error { + _, err := ms.getCol(ColRpf).UpdateOne(sctx, bson.M{"id": rp.Id}, bson.M{"$set": rp}, options.Update().SetUpsert(true), ) @@ -803,7 +793,7 @@ func (ms *MongoStorage) SetRatingProfileDrv(rp *RatingProfile) (err error) { } func (ms *MongoStorage) RemoveRatingProfileDrv(key string) error { - return ms.query(func(sctx mongo.SessionContext) (err error) { + return ms.query(func(sctx mongo.SessionContext) error { dr, err := ms.getCol(ColRpf).DeleteMany(sctx, bson.M{"id": key}) if dr.DeletedCount == 0 { return utils.ErrNotFound @@ -812,21 +802,22 @@ func (ms *MongoStorage) RemoveRatingProfileDrv(key string) error { }) } -func (ms *MongoStorage) GetDestinationDrv(key, transactionID string) (result *Destination, err error) { +func (ms *MongoStorage) GetDestinationDrv(key, transactionID string) (*Destination, error) { var kv struct { Key string Value []byte } - if err = ms.query(func(sctx mongo.SessionContext) (err error) { - cur := ms.getCol(ColDst).FindOne(sctx, bson.M{"key": key}) - if err := cur.Decode(&kv); err != nil { - if err == mongo.ErrNoDocuments { - return utils.ErrNotFound - } - return err + err := ms.query(func(sctx mongo.SessionContext) (err error) { + sr := ms.getCol(ColDst).FindOne(sctx, bson.M{"key": key}) + decodeErr := sr.Decode(&kv) + if errors.Is(decodeErr, mongo.ErrNoDocuments) { + Cache.Set(utils.CacheDestinations, key, nil, nil, + cacheCommit(transactionID), transactionID) + return utils.ErrNotFound } - return nil - }); err != nil { + return decodeErr + }) + if err != nil { return nil, err } b := bytes.NewBuffer(kv.Value) @@ -838,21 +829,31 @@ func (ms *MongoStorage) GetDestinationDrv(key, transactionID string) (result *De if err != nil { return nil, err } - r.Close() - err = ms.ms.Unmarshal(out, &result) - return + err = r.Close() + if err != nil { + return nil, err + } + var dst *Destination + err = ms.ms.Unmarshal(out, &dst) + return dst, err } -func (ms *MongoStorage) SetDestinationDrv(dest *Destination, transactionID string) (err error) { +func (ms *MongoStorage) SetDestinationDrv(dest *Destination, _ string) error { result, err := ms.ms.Marshal(dest) if err != nil { return err } var b bytes.Buffer w := zlib.NewWriter(&b) - w.Write(result) - w.Close() - return ms.query(func(sctx mongo.SessionContext) (err error) { + _, err = w.Write(result) + if err != nil { + return err + } + err = w.Close() + if err != nil { + return err + } + return ms.query(func(sctx mongo.SessionContext) error { _, err = ms.getCol(ColDst).UpdateOne(sctx, bson.M{"key": dest.Id}, bson.M{"$set": struct { Key string @@ -865,8 +866,8 @@ func (ms *MongoStorage) SetDestinationDrv(dest *Destination, transactionID strin } func (ms *MongoStorage) RemoveDestinationDrv(destID string, - transactionID string) (err error) { - return ms.query(func(sctx mongo.SessionContext) (err error) { + transactionID string) error { + return ms.query(func(sctx mongo.SessionContext) error { dr, err := ms.getCol(ColDst).DeleteOne(sctx, bson.M{"key": destID}) if dr.DeletedCount == 0 { return utils.ErrNotFound @@ -875,74 +876,68 @@ func (ms *MongoStorage) RemoveDestinationDrv(destID string, }) } -func (ms *MongoStorage) RemoveReverseDestinationDrv(dstID, prfx, transactionID string) (err error) { - return ms.query(func(sctx mongo.SessionContext) (err error) { - _, err = ms.getCol(ColRds).UpdateOne(sctx, bson.M{"key": prfx}, +func (ms *MongoStorage) RemoveReverseDestinationDrv(dstID, prfx, transactionID string) error { + return ms.query(func(sctx mongo.SessionContext) error { + _, err := ms.getCol(ColRds).UpdateOne(sctx, bson.M{"key": prfx}, bson.M{"$pull": bson.M{"value": dstID}}) return err }) } -func (ms *MongoStorage) GetReverseDestinationDrv(prefix, transactionID string) (ids []string, err error) { +func (ms *MongoStorage) GetReverseDestinationDrv(prefix, transactionID string) ([]string, error) { var result struct { Key string Value []string } - if err = ms.query(func(sctx mongo.SessionContext) (err error) { - cur := ms.getCol(ColRds).FindOne(sctx, bson.M{"key": prefix}) - if err := cur.Decode(&result); err != nil { - if err == mongo.ErrNoDocuments { - return utils.ErrNotFound - } - return err + err := ms.query(func(sctx mongo.SessionContext) error { + sr := ms.getCol(ColRds).FindOne(sctx, bson.M{"key": prefix}) + decodeErr := sr.Decode(&result) + if errors.Is(decodeErr, mongo.ErrNoDocuments) { + return utils.ErrNotFound } - return nil - }); err != nil { + return decodeErr + }) + if err != nil { return nil, err } - ids = result.Value - return + return result.Value, nil } -func (ms *MongoStorage) SetReverseDestinationDrv(destID string, prefixes []string, transactionID string) (err error) { +func (ms *MongoStorage) SetReverseDestinationDrv(destID string, prefixes []string, _ string) error { for _, p := range prefixes { - if err = ms.query(func(sctx mongo.SessionContext) (err error) { - _, err = ms.getCol(ColRds).UpdateOne(sctx, bson.M{"key": p}, + err := ms.query(func(sctx mongo.SessionContext) error { + _, qryErr := ms.getCol(ColRds).UpdateOne(sctx, bson.M{"key": p}, bson.M{"$addToSet": bson.M{"value": destID}}, options.Update().SetUpsert(true), ) - return err - }); err != nil { + return qryErr + }) + if err != nil { return err } } return nil } -func (ms *MongoStorage) GetActionsDrv(key string) (as Actions, err error) { +func (ms *MongoStorage) GetActionsDrv(key string) (Actions, error) { var result struct { Key string Value Actions } - if err = ms.query(func(sctx mongo.SessionContext) (err error) { - cur := ms.getCol(ColAct).FindOne(sctx, bson.M{"key": key}) - if err := cur.Decode(&result); err != nil { - if err == mongo.ErrNoDocuments { - return utils.ErrNotFound - } - return err + err := ms.query(func(sctx mongo.SessionContext) error { + sr := ms.getCol(ColAct).FindOne(sctx, bson.M{"key": key}) + decodeErr := sr.Decode(&result) + if errors.Is(decodeErr, mongo.ErrNoDocuments) { + return utils.ErrNotFound } - return nil - }); err != nil { - return nil, err - } - as = result.Value - return + return decodeErr + }) + return result.Value, err } func (ms *MongoStorage) SetActionsDrv(key string, as Actions) error { - return ms.query(func(sctx mongo.SessionContext) (err error) { - _, err = ms.getCol(ColAct).UpdateOne(sctx, bson.M{"key": key}, + return ms.query(func(sctx mongo.SessionContext) error { + _, err := ms.getCol(ColAct).UpdateOne(sctx, bson.M{"key": key}, bson.M{"$set": struct { Key string Value Actions @@ -954,7 +949,7 @@ func (ms *MongoStorage) SetActionsDrv(key string, as Actions) error { } func (ms *MongoStorage) RemoveActionsDrv(key string) error { - return ms.query(func(sctx mongo.SessionContext) (err error) { + return ms.query(func(sctx mongo.SessionContext) error { dr, err := ms.getCol(ColAct).DeleteOne(sctx, bson.M{"key": key}) if dr.DeletedCount == 0 { return utils.ErrNotFound @@ -963,25 +958,22 @@ func (ms *MongoStorage) RemoveActionsDrv(key string) error { }) } -func (ms *MongoStorage) GetSharedGroupDrv(key string) (sg *SharedGroup, err error) { - sg = new(SharedGroup) - err = ms.query(func(sctx mongo.SessionContext) (err error) { - cur := ms.getCol(ColShg).FindOne(sctx, bson.M{"id": key}) - if err := cur.Decode(sg); err != nil { - sg = nil - if err == mongo.ErrNoDocuments { - return utils.ErrNotFound - } - return err +func (ms *MongoStorage) GetSharedGroupDrv(key string) (*SharedGroup, error) { + sg := new(SharedGroup) + err := ms.query(func(sctx mongo.SessionContext) error { + sr := ms.getCol(ColShg).FindOne(sctx, bson.M{"id": key}) + decodeErr := sr.Decode(sg) + if errors.Is(decodeErr, mongo.ErrNoDocuments) { + return utils.ErrNotFound } - return nil + return decodeErr }) - return + return sg, err } -func (ms *MongoStorage) SetSharedGroupDrv(sg *SharedGroup) (err error) { - return ms.query(func(sctx mongo.SessionContext) (err error) { - _, err = ms.getCol(ColShg).UpdateOne(sctx, bson.M{"id": sg.Id}, +func (ms *MongoStorage) SetSharedGroupDrv(sg *SharedGroup) error { + return ms.query(func(sctx mongo.SessionContext) error { + _, err := ms.getCol(ColShg).UpdateOne(sctx, bson.M{"id": sg.Id}, bson.M{"$set": sg}, options.Update().SetUpsert(true), ) @@ -989,8 +981,8 @@ func (ms *MongoStorage) SetSharedGroupDrv(sg *SharedGroup) (err error) { }) } -func (ms *MongoStorage) RemoveSharedGroupDrv(id string) (err error) { - return ms.query(func(sctx mongo.SessionContext) (err error) { +func (ms *MongoStorage) RemoveSharedGroupDrv(id string) error { + return ms.query(func(sctx mongo.SessionContext) error { dr, err := ms.getCol(ColShg).DeleteOne(sctx, bson.M{"id": id}) if dr.DeletedCount == 0 { return utils.ErrNotFound @@ -999,20 +991,17 @@ func (ms *MongoStorage) RemoveSharedGroupDrv(id string) (err error) { }) } -func (ms *MongoStorage) GetAccountDrv(key string) (result *Account, err error) { - result = new(Account) - err = ms.query(func(sctx mongo.SessionContext) (err error) { - cur := ms.getCol(ColAcc).FindOne(sctx, bson.M{"id": key}) - if err := cur.Decode(result); err != nil { - result = nil - if err == mongo.ErrNoDocuments { - return utils.ErrNotFound - } - return err +func (ms *MongoStorage) GetAccountDrv(key string) (*Account, error) { + acc := new(Account) + err := ms.query(func(sctx mongo.SessionContext) (err error) { + sr := ms.getCol(ColAcc).FindOne(sctx, bson.M{"id": key}) + decodeErr := sr.Decode(acc) + if errors.Is(decodeErr, mongo.ErrNoDocuments) { + return utils.ErrNotFound } - return nil + return decodeErr }) - return + return acc, err } func (ms *MongoStorage) SetAccountDrv(acc *Account) error { @@ -1020,7 +1009,8 @@ func (ms *MongoStorage) SetAccountDrv(acc *Account) error { // UPDATE: if all balances expired and were cleaned it makes // sense to write empty balance map if len(acc.BalanceMap) == 0 { - if ac, err := ms.GetAccountDrv(acc.ID); err == nil && !ac.allBalancesExpired() { + ac, err := ms.GetAccountDrv(acc.ID) + if err == nil && !ac.allBalancesExpired() { ac.ActionTriggers = acc.ActionTriggers ac.UnitCounters = acc.UnitCounters ac.AllowNegative = acc.AllowNegative @@ -1029,8 +1019,8 @@ func (ms *MongoStorage) SetAccountDrv(acc *Account) error { } } acc.UpdateTime = time.Now() - return ms.query(func(sctx mongo.SessionContext) (err error) { - _, err = ms.getCol(ColAcc).UpdateOne(sctx, bson.M{"id": acc.ID}, + return ms.query(func(sctx mongo.SessionContext) error { + _, err := ms.getCol(ColAcc).UpdateOne(sctx, bson.M{"id": acc.ID}, bson.M{"$set": acc}, options.Update().SetUpsert(true), ) @@ -1038,8 +1028,8 @@ func (ms *MongoStorage) SetAccountDrv(acc *Account) error { }) } -func (ms *MongoStorage) RemoveAccountDrv(key string) (err error) { - return ms.query(func(sctx mongo.SessionContext) (err error) { +func (ms *MongoStorage) RemoveAccountDrv(key string) error { + return ms.query(func(sctx mongo.SessionContext) error { dr, err := ms.getCol(ColAcc).DeleteOne(sctx, bson.M{"id": key}) if dr.DeletedCount == 0 { return utils.ErrNotFound @@ -1048,16 +1038,20 @@ func (ms *MongoStorage) RemoveAccountDrv(key string) (err error) { }) } -// Limit will only retrieve the last n items out of history, newest first +// GetLoadHistory retrieves the last n items from the load history, newest first. func (ms *MongoStorage) GetLoadHistory(limit int, skipCache bool, - transactionID string) (loadInsts []*utils.LoadInstance, err error) { + transactionID string) ([]*utils.LoadInstance, error) { if limit == 0 { return nil, nil } if !skipCache { - if x, ok := Cache.Get(utils.LoadInstKey, ""); ok { + x, ok := Cache.Get(utils.LoadInstKey, "") + if ok { if x != nil { - items := x.([]*utils.LoadInstance) + items, ok := x.([]*utils.LoadInstance) + if !ok { + return nil, utils.ErrCastFailed + } if len(items) < limit || limit == -1 { return items, nil } @@ -1070,66 +1064,61 @@ func (ms *MongoStorage) GetLoadHistory(limit int, skipCache bool, Key string Value []*utils.LoadInstance } - err = ms.query(func(sctx mongo.SessionContext) (err error) { - cur := ms.getCol(ColLht).FindOne(sctx, bson.M{"key": utils.LoadInstKey}) - if err := cur.Decode(&kv); err != nil { - if err == mongo.ErrNoDocuments { - return utils.ErrNotFound - } - return err + err := ms.query(func(sctx mongo.SessionContext) error { + sr := ms.getCol(ColLht).FindOne(sctx, bson.M{"key": utils.LoadInstKey}) + decodeErr := sr.Decode(&kv) + if errors.Is(decodeErr, mongo.ErrNoDocuments) { + return utils.ErrNotFound } - return nil + return decodeErr }) cCommit := cacheCommit(transactionID) if err == nil { - loadInsts = kv.Value if errCh := Cache.Remove(utils.LoadInstKey, "", cCommit, transactionID); errCh != nil { return nil, errCh } - if errCh := Cache.Set(utils.LoadInstKey, "", loadInsts, nil, cCommit, transactionID); errCh != nil { + if errCh := Cache.Set(utils.LoadInstKey, "", kv.Value, nil, cCommit, transactionID); errCh != nil { return nil, errCh } } - if len(loadInsts) < limit || limit == -1 { - return loadInsts, nil + if len(kv.Value) < limit || limit == -1 { + return kv.Value, nil } - return loadInsts[:limit], nil + return kv.Value[:limit], nil } -// Adds a single load instance to load history +// AddLoadHistory adds a single load instance to the load history. func (ms *MongoStorage) AddLoadHistory(ldInst *utils.LoadInstance, loadHistSize int, transactionID string) error { if loadHistSize == 0 { // Load history disabled return nil } - // get existing load history + // Get existing load history. var existingLoadHistory []*utils.LoadInstance var kv struct { Key string Value []*utils.LoadInstance } - if err := ms.query(func(sctx mongo.SessionContext) (err error) { - cur := ms.getCol(ColLht).FindOne(sctx, bson.M{"key": utils.LoadInstKey}) - if err := cur.Decode(&kv); err != nil { - if err == mongo.ErrNoDocuments { - return nil // utils.ErrNotFound - } - return err + err := ms.query(func(sctx mongo.SessionContext) error { + sr := ms.getCol(ColLht).FindOne(sctx, bson.M{"key": utils.LoadInstKey}) + decodeErr := sr.Decode(&kv) + if errors.Is(decodeErr, mongo.ErrNoDocuments) { + return nil // utils.ErrNotFound } - return nil - }); err != nil { - return err - } + return decodeErr + }) if kv.Value != nil { existingLoadHistory = kv.Value } - err := guardian.Guardian.Guard(func() error { // Make sure we do it locked since other instance can modify history while we read it - // insert on first position + + // Make sure we do it locked since other instances can modify the history while we read it. + err = guardian.Guardian.Guard(func() error { + + // Insert at the first position. existingLoadHistory = append(existingLoadHistory, nil) copy(existingLoadHistory[1:], existingLoadHistory[0:]) existingLoadHistory[0] = ldInst - //check length histLen := len(existingLoadHistory) if histLen >= loadHistSize { // Have hit maximum history allowed, remove oldest element in order to add new one existingLoadHistory = existingLoadHistory[:loadHistSize] @@ -1153,36 +1142,31 @@ func (ms *MongoStorage) AddLoadHistory(ldInst *utils.LoadInstance, return err } -func (ms *MongoStorage) GetActionTriggersDrv(key string) (atrs ActionTriggers, err error) { +func (ms *MongoStorage) GetActionTriggersDrv(key string) (ActionTriggers, error) { var kv struct { Key string Value ActionTriggers } - if err := ms.query(func(sctx mongo.SessionContext) (err error) { - cur := ms.getCol(ColAtr).FindOne(sctx, bson.M{"key": key}) - if err := cur.Decode(&kv); err != nil { - if err == mongo.ErrNoDocuments { - return utils.ErrNotFound - } - return err + err := ms.query(func(sctx mongo.SessionContext) error { + sr := ms.getCol(ColAtr).FindOne(sctx, bson.M{"key": key}) + decodeErr := sr.Decode(&kv) + if errors.Is(decodeErr, mongo.ErrNoDocuments) { + return utils.ErrNotFound } - return nil - }); err != nil { - return nil, err - } - atrs = kv.Value - return + return decodeErr + }) + return kv.Value, err } -func (ms *MongoStorage) SetActionTriggersDrv(key string, atrs ActionTriggers) (err error) { +func (ms *MongoStorage) SetActionTriggersDrv(key string, atrs ActionTriggers) error { if len(atrs) == 0 { - return ms.query(func(sctx mongo.SessionContext) (err error) { - _, err = ms.getCol(ColAtr).DeleteOne(sctx, bson.M{"key": key}) + return ms.query(func(sctx mongo.SessionContext) error { + _, err := ms.getCol(ColAtr).DeleteOne(sctx, bson.M{"key": key}) return err }) } - return ms.query(func(sctx mongo.SessionContext) (err error) { - _, err = ms.getCol(ColAtr).UpdateOne(sctx, bson.M{"key": key}, + return ms.query(func(sctx mongo.SessionContext) error { + _, err := ms.getCol(ColAtr).UpdateOne(sctx, bson.M{"key": key}, bson.M{"$set": struct { Key string Value ActionTriggers @@ -1195,7 +1179,7 @@ func (ms *MongoStorage) SetActionTriggersDrv(key string, atrs ActionTriggers) (e func (ms *MongoStorage) RemoveActionTriggersDrv(key string) error { - return ms.query(func(sctx mongo.SessionContext) (err error) { + return ms.query(func(sctx mongo.SessionContext) error { dr, err := ms.getCol(ColAtr).DeleteOne(sctx, bson.M{"key": key}) if dr.DeletedCount == 0 { return utils.ErrNotFound @@ -1204,21 +1188,20 @@ func (ms *MongoStorage) RemoveActionTriggersDrv(key string) error { }) } -func (ms *MongoStorage) GetActionPlanDrv(key string) (ats *ActionPlan, err error) { +func (ms *MongoStorage) GetActionPlanDrv(key string) (*ActionPlan, error) { var kv struct { Key string Value []byte } - if err := ms.query(func(sctx mongo.SessionContext) (err error) { - cur := ms.getCol(ColApl).FindOne(sctx, bson.M{"key": key}) - if err := cur.Decode(&kv); err != nil { - if err == mongo.ErrNoDocuments { - return utils.ErrNotFound - } - return err + err := ms.query(func(sctx mongo.SessionContext) error { + sr := ms.getCol(ColApl).FindOne(sctx, bson.M{"key": key}) + decodeErr := sr.Decode(&kv) + if errors.Is(decodeErr, mongo.ErrNoDocuments) { + return utils.ErrNotFound } - return nil - }); err != nil { + return decodeErr + }) + if err != nil { return nil, err } b := bytes.NewBuffer(kv.Value) @@ -1230,22 +1213,32 @@ func (ms *MongoStorage) GetActionPlanDrv(key string) (ats *ActionPlan, err error if err != nil { return nil, err } - r.Close() - err = ms.ms.Unmarshal(out, &ats) - return + err = r.Close() + if err != nil { + return nil, err + } + var ap *ActionPlan + err = ms.ms.Unmarshal(out, &ap) + return ap, err } -func (ms *MongoStorage) SetActionPlanDrv(key string, ats *ActionPlan) (err error) { +func (ms *MongoStorage) SetActionPlanDrv(key string, ats *ActionPlan) error { result, err := ms.ms.Marshal(ats) if err != nil { return err } var b bytes.Buffer w := zlib.NewWriter(&b) - w.Write(result) - w.Close() - return ms.query(func(sctx mongo.SessionContext) (err error) { - _, err = ms.getCol(ColApl).UpdateOne(sctx, bson.M{"key": key}, + _, err = w.Write(result) + if err != nil { + return err + } + err = w.Close() + if err != nil { + return err + } + return ms.query(func(sctx mongo.SessionContext) error { + _, err := ms.getCol(ColApl).UpdateOne(sctx, bson.M{"key": key}, bson.M{"$set": struct { Key string Value []byte @@ -1257,13 +1250,13 @@ func (ms *MongoStorage) SetActionPlanDrv(key string, ats *ActionPlan) (err error } func (ms *MongoStorage) RemoveActionPlanDrv(key string) error { - return ms.query(func(sctx mongo.SessionContext) (err error) { - _, err = ms.getCol(ColApl).DeleteOne(sctx, bson.M{"key": key}) + return ms.query(func(sctx mongo.SessionContext) error { + _, err := ms.getCol(ColApl).DeleteOne(sctx, bson.M{"key": key}) return err }) } -func (ms *MongoStorage) GetAllActionPlansDrv() (ats map[string]*ActionPlan, err error) { +func (ms *MongoStorage) GetAllActionPlansDrv() (map[string]*ActionPlan, error) { keys, err := ms.GetKeysForPrefix(utils.ActionPlanPrefix) if err != nil { return nil, err @@ -1271,41 +1264,36 @@ func (ms *MongoStorage) GetAllActionPlansDrv() (ats map[string]*ActionPlan, err if len(keys) == 0 { return nil, utils.ErrNotFound } - ats = make(map[string]*ActionPlan, len(keys)) + actionPlans := make(map[string]*ActionPlan, len(keys)) for _, key := range keys { ap, err := ms.GetActionPlanDrv(key[len(utils.ActionPlanPrefix):]) if err != nil { return nil, err } - ats[key[len(utils.ActionPlanPrefix):]] = ap + actionPlans[key[len(utils.ActionPlanPrefix):]] = ap } - return + return actionPlans, nil } -func (ms *MongoStorage) GetAccountActionPlansDrv(acntID string) (aPlIDs []string, err error) { +func (ms *MongoStorage) GetAccountActionPlansDrv(acntID string) ([]string, error) { var kv struct { Key string Value []string } - if err = ms.query(func(sctx mongo.SessionContext) (err error) { - cur := ms.getCol(ColAAp).FindOne(sctx, bson.M{"key": acntID}) - if err := cur.Decode(&kv); err != nil { - if err == mongo.ErrNoDocuments { - return utils.ErrNotFound - } - return err + err := ms.query(func(sctx mongo.SessionContext) error { + sr := ms.getCol(ColAAp).FindOne(sctx, bson.M{"key": acntID}) + decodeErr := sr.Decode(&kv) + if errors.Is(decodeErr, mongo.ErrNoDocuments) { + return utils.ErrNotFound } - return nil - }); err != nil { - return nil, err - } - aPlIDs = kv.Value - return + return decodeErr + }) + return kv.Value, err } -func (ms *MongoStorage) SetAccountActionPlansDrv(acntID string, aPlIDs []string) (err error) { - return ms.query(func(sctx mongo.SessionContext) (err error) { - _, err = ms.getCol(ColAAp).UpdateOne(sctx, bson.M{"key": acntID}, +func (ms *MongoStorage) SetAccountActionPlansDrv(acntID string, aPlIDs []string) error { + return ms.query(func(sctx mongo.SessionContext) error { + _, err := ms.getCol(ColAAp).UpdateOne(sctx, bson.M{"key": acntID}, bson.M{"$set": struct { Key string Value []string @@ -1317,8 +1305,8 @@ func (ms *MongoStorage) SetAccountActionPlansDrv(acntID string, aPlIDs []string) } // ToDo: check return len(aPlIDs) == 0 -func (ms *MongoStorage) RemAccountActionPlansDrv(acntID string) (err error) { - return ms.query(func(sctx mongo.SessionContext) (err error) { +func (ms *MongoStorage) RemAccountActionPlansDrv(acntID string) error { + return ms.query(func(sctx mongo.SessionContext) error { dr, err := ms.getCol(ColAAp).DeleteOne(sctx, bson.M{"key": acntID}) if dr.DeletedCount == 0 { return utils.ErrNotFound @@ -1334,45 +1322,38 @@ func (ms *MongoStorage) PushTask(t *Task) error { }) } -func (ms *MongoStorage) PopTask() (t *Task, err error) { +func (ms *MongoStorage) PopTask() (*Task, error) { v := struct { ID primitive.ObjectID `bson:"_id"` Task *Task }{} - if err = ms.query(func(sctx mongo.SessionContext) (err error) { - cur := ms.getCol(ColTsk).FindOneAndDelete(sctx, bson.D{}) - if err := cur.Decode(&v); err != nil { - if err == mongo.ErrNoDocuments { - return utils.ErrNotFound - } - return err + err := ms.query(func(sctx mongo.SessionContext) error { + sr := ms.getCol(ColTsk).FindOneAndDelete(sctx, bson.D{}) + decodeErr := sr.Decode(&v) + if errors.Is(decodeErr, mongo.ErrNoDocuments) { + return utils.ErrNotFound } - return nil - }); err != nil { - return nil, err - } - return v.Task, nil -} - -func (ms *MongoStorage) GetResourceProfileDrv(tenant, id string) (rp *ResourceProfile, err error) { - rp = new(ResourceProfile) - err = ms.query(func(sctx mongo.SessionContext) (err error) { - cur := ms.getCol(ColRsP).FindOne(sctx, bson.M{"tenant": tenant, "id": id}) - if err := cur.Decode(rp); err != nil { - rp = nil - if err == mongo.ErrNoDocuments { - return utils.ErrNotFound - } - return err - } - return nil + return decodeErr }) - return + return v.Task, err } -func (ms *MongoStorage) SetResourceProfileDrv(rp *ResourceProfile) (err error) { - return ms.query(func(sctx mongo.SessionContext) (err error) { - _, err = ms.getCol(ColRsP).UpdateOne(sctx, bson.M{"tenant": rp.Tenant, "id": rp.ID}, +func (ms *MongoStorage) GetResourceProfileDrv(tenant, id string) (*ResourceProfile, error) { + rsProfile := new(ResourceProfile) + err := ms.query(func(sctx mongo.SessionContext) error { + sr := ms.getCol(ColRsP).FindOne(sctx, bson.M{"tenant": tenant, "id": id}) + decodeErr := sr.Decode(rsProfile) + if errors.Is(decodeErr, mongo.ErrNoDocuments) { + return utils.ErrNotFound + } + return decodeErr + }) + return rsProfile, err +} + +func (ms *MongoStorage) SetResourceProfileDrv(rp *ResourceProfile) error { + return ms.query(func(sctx mongo.SessionContext) error { + _, err := ms.getCol(ColRsP).UpdateOne(sctx, bson.M{"tenant": rp.Tenant, "id": rp.ID}, bson.M{"$set": rp}, options.Update().SetUpsert(true), ) @@ -1380,8 +1361,8 @@ func (ms *MongoStorage) SetResourceProfileDrv(rp *ResourceProfile) (err error) { }) } -func (ms *MongoStorage) RemoveResourceProfileDrv(tenant, id string) (err error) { - return ms.query(func(sctx mongo.SessionContext) (err error) { +func (ms *MongoStorage) RemoveResourceProfileDrv(tenant, id string) error { + return ms.query(func(sctx mongo.SessionContext) error { dr, err := ms.getCol(ColRsP).DeleteOne(sctx, bson.M{"tenant": tenant, "id": id}) if dr.DeletedCount == 0 { return utils.ErrNotFound @@ -1390,25 +1371,22 @@ func (ms *MongoStorage) RemoveResourceProfileDrv(tenant, id string) (err error) }) } -func (ms *MongoStorage) GetResourceDrv(tenant, id string) (r *Resource, err error) { - r = new(Resource) - err = ms.query(func(sctx mongo.SessionContext) (err error) { - cur := ms.getCol(ColRes).FindOne(sctx, bson.M{"tenant": tenant, "id": id}) - if err := cur.Decode(r); err != nil { - r = nil - if err == mongo.ErrNoDocuments { - return utils.ErrNotFound - } - return err +func (ms *MongoStorage) GetResourceDrv(tenant, id string) (*Resource, error) { + resource := new(Resource) + err := ms.query(func(sctx mongo.SessionContext) error { + sr := ms.getCol(ColRes).FindOne(sctx, bson.M{"tenant": tenant, "id": id}) + decodeErr := sr.Decode(resource) + if errors.Is(decodeErr, mongo.ErrNoDocuments) { + return utils.ErrNotFound } - return nil + return decodeErr }) - return + return resource, err } -func (ms *MongoStorage) SetResourceDrv(r *Resource) (err error) { - return ms.query(func(sctx mongo.SessionContext) (err error) { - _, err = ms.getCol(ColRes).UpdateOne(sctx, bson.M{"tenant": r.Tenant, "id": r.ID}, +func (ms *MongoStorage) SetResourceDrv(r *Resource) error { + return ms.query(func(sctx mongo.SessionContext) error { + _, err := ms.getCol(ColRes).UpdateOne(sctx, bson.M{"tenant": r.Tenant, "id": r.ID}, bson.M{"$set": r}, options.Update().SetUpsert(true), ) @@ -1416,8 +1394,8 @@ func (ms *MongoStorage) SetResourceDrv(r *Resource) (err error) { }) } -func (ms *MongoStorage) RemoveResourceDrv(tenant, id string) (err error) { - return ms.query(func(sctx mongo.SessionContext) (err error) { +func (ms *MongoStorage) RemoveResourceDrv(tenant, id string) error { + return ms.query(func(sctx mongo.SessionContext) error { dr, err := ms.getCol(ColRes).DeleteOne(sctx, bson.M{"tenant": tenant, "id": id}) if dr.DeletedCount == 0 { return utils.ErrNotFound @@ -1426,25 +1404,22 @@ func (ms *MongoStorage) RemoveResourceDrv(tenant, id string) (err error) { }) } -func (ms *MongoStorage) GetTimingDrv(id string) (t *utils.TPTiming, err error) { - t = new(utils.TPTiming) - err = ms.query(func(sctx mongo.SessionContext) (err error) { - cur := ms.getCol(ColTmg).FindOne(sctx, bson.M{"id": id}) - if err := cur.Decode(t); err != nil { - t = nil - if err == mongo.ErrNoDocuments { - return utils.ErrNotFound - } - return err +func (ms *MongoStorage) GetTimingDrv(id string) (*utils.TPTiming, error) { + timing := new(utils.TPTiming) + err := ms.query(func(sctx mongo.SessionContext) error { + sr := ms.getCol(ColTmg).FindOne(sctx, bson.M{"id": id}) + decodeErr := sr.Decode(timing) + if errors.Is(decodeErr, mongo.ErrNoDocuments) { + return utils.ErrNotFound } - return nil + return decodeErr }) - return + return timing, err } -func (ms *MongoStorage) SetTimingDrv(t *utils.TPTiming) (err error) { - return ms.query(func(sctx mongo.SessionContext) (err error) { - _, err = ms.getCol(ColTmg).UpdateOne(sctx, bson.M{"id": t.ID}, +func (ms *MongoStorage) SetTimingDrv(t *utils.TPTiming) error { + return ms.query(func(sctx mongo.SessionContext) error { + _, err := ms.getCol(ColTmg).UpdateOne(sctx, bson.M{"id": t.ID}, bson.M{"$set": t}, options.Update().SetUpsert(true), ) @@ -1452,8 +1427,8 @@ func (ms *MongoStorage) SetTimingDrv(t *utils.TPTiming) (err error) { }) } -func (ms *MongoStorage) RemoveTimingDrv(id string) (err error) { - return ms.query(func(sctx mongo.SessionContext) (err error) { +func (ms *MongoStorage) RemoveTimingDrv(id string) error { + return ms.query(func(sctx mongo.SessionContext) error { dr, err := ms.getCol(ColTmg).DeleteOne(sctx, bson.M{"id": id}) if dr.DeletedCount == 0 { return utils.ErrNotFound @@ -1463,26 +1438,23 @@ func (ms *MongoStorage) RemoveTimingDrv(id string) (err error) { } // GetStatQueueProfileDrv retrieves a StatQueueProfile from dataDB -func (ms *MongoStorage) GetStatQueueProfileDrv(tenant string, id string) (sq *StatQueueProfile, err error) { - sq = new(StatQueueProfile) - err = ms.query(func(sctx mongo.SessionContext) (err error) { - cur := ms.getCol(ColSqp).FindOne(sctx, bson.M{"tenant": tenant, "id": id}) - if err := cur.Decode(sq); err != nil { - sq = nil - if err == mongo.ErrNoDocuments { - return utils.ErrNotFound - } - return err +func (ms *MongoStorage) GetStatQueueProfileDrv(tenant string, id string) (*StatQueueProfile, error) { + sqProfile := new(StatQueueProfile) + err := ms.query(func(sctx mongo.SessionContext) error { + sr := ms.getCol(ColSqp).FindOne(sctx, bson.M{"tenant": tenant, "id": id}) + decodeErr := sr.Decode(sqProfile) + if errors.Is(decodeErr, mongo.ErrNoDocuments) { + return utils.ErrNotFound } - return nil + return decodeErr }) - return + return sqProfile, err } // SetStatQueueProfileDrv stores a StatsQueue into DataDB -func (ms *MongoStorage) SetStatQueueProfileDrv(sq *StatQueueProfile) (err error) { - return ms.query(func(sctx mongo.SessionContext) (err error) { - _, err = ms.getCol(ColSqp).UpdateOne(sctx, bson.M{"tenant": sq.Tenant, "id": sq.ID}, +func (ms *MongoStorage) SetStatQueueProfileDrv(sq *StatQueueProfile) error { + return ms.query(func(sctx mongo.SessionContext) error { + _, err := ms.getCol(ColSqp).UpdateOne(sctx, bson.M{"tenant": sq.Tenant, "id": sq.ID}, bson.M{"$set": sq}, options.Update().SetUpsert(true), ) @@ -1491,8 +1463,8 @@ func (ms *MongoStorage) SetStatQueueProfileDrv(sq *StatQueueProfile) (err error) } // RemStatQueueProfileDrv removes a StatsQueue from dataDB -func (ms *MongoStorage) RemStatQueueProfileDrv(tenant, id string) (err error) { - return ms.query(func(sctx mongo.SessionContext) (err error) { +func (ms *MongoStorage) RemStatQueueProfileDrv(tenant, id string) error { + return ms.query(func(sctx mongo.SessionContext) error { dr, err := ms.getCol(ColSqp).DeleteOne(sctx, bson.M{"tenant": tenant, "id": id}) if dr.DeletedCount == 0 { return utils.ErrNotFound @@ -1502,34 +1474,31 @@ func (ms *MongoStorage) RemStatQueueProfileDrv(tenant, id string) (err error) { } // GetStatQueueDrv retrieves a StoredStatQueue -func (ms *MongoStorage) GetStatQueueDrv(tenant, id string) (sq *StatQueue, err error) { +func (ms *MongoStorage) GetStatQueueDrv(tenant, id string) (*StatQueue, error) { ssq := new(StoredStatQueue) - if err = ms.query(func(sctx mongo.SessionContext) (err error) { - cur := ms.getCol(ColSqs).FindOne(sctx, bson.M{"tenant": tenant, "id": id}) - if err := cur.Decode(ssq); err != nil { - sq = nil - if err == mongo.ErrNoDocuments { - return utils.ErrNotFound - } - return err + err := ms.query(func(sctx mongo.SessionContext) error { + sr := ms.getCol(ColSqs).FindOne(sctx, bson.M{"tenant": tenant, "id": id}) + decodeErr := sr.Decode(ssq) + if errors.Is(decodeErr, mongo.ErrNoDocuments) { + return utils.ErrNotFound } - return nil - }); err != nil { - return + return decodeErr + }) + if err != nil { + return nil, err } - sq, err = ssq.AsStatQueue(ms.ms) - return + return ssq.AsStatQueue(ms.ms) } // SetStatQueueDrv stores the metrics for a StoredStatQueue func (ms *MongoStorage) SetStatQueueDrv(ssq *StoredStatQueue, sq *StatQueue) (err error) { if ssq == nil { if ssq, err = NewStoredStatQueue(sq, ms.ms); err != nil { - return + return err } } - return ms.query(func(sctx mongo.SessionContext) (err error) { - _, err = ms.getCol(ColSqs).UpdateOne(sctx, bson.M{"tenant": ssq.Tenant, "id": ssq.ID}, + return ms.query(func(sctx mongo.SessionContext) error { + _, err := ms.getCol(ColSqs).UpdateOne(sctx, bson.M{"tenant": ssq.Tenant, "id": ssq.ID}, bson.M{"$set": ssq}, options.Update().SetUpsert(true), ) @@ -1538,8 +1507,8 @@ func (ms *MongoStorage) SetStatQueueDrv(ssq *StoredStatQueue, sq *StatQueue) (er } // RemStatQueueDrv removes stored metrics for a StoredStatQueue -func (ms *MongoStorage) RemStatQueueDrv(tenant, id string) (err error) { - return ms.query(func(sctx mongo.SessionContext) (err error) { +func (ms *MongoStorage) RemStatQueueDrv(tenant, id string) error { + return ms.query(func(sctx mongo.SessionContext) error { dr, err := ms.getCol(ColSqs).DeleteOne(sctx, bson.M{"tenant": tenant, "id": id}) if dr.DeletedCount == 0 { return utils.ErrNotFound @@ -1549,24 +1518,21 @@ func (ms *MongoStorage) RemStatQueueDrv(tenant, id string) (err error) { } // GetThresholdProfileDrv retrieves a ThresholdProfile from dataDB -func (ms *MongoStorage) GetThresholdProfileDrv(tenant, ID string) (tp *ThresholdProfile, err error) { - tp = new(ThresholdProfile) - err = ms.query(func(sctx mongo.SessionContext) (err error) { - cur := ms.getCol(ColTps).FindOne(sctx, bson.M{"tenant": tenant, "id": ID}) - if err := cur.Decode(tp); err != nil { - tp = nil - if err == mongo.ErrNoDocuments { - return utils.ErrNotFound - } - return err +func (ms *MongoStorage) GetThresholdProfileDrv(tenant, ID string) (*ThresholdProfile, error) { + thProfile := new(ThresholdProfile) + err := ms.query(func(sctx mongo.SessionContext) error { + sr := ms.getCol(ColTps).FindOne(sctx, bson.M{"tenant": tenant, "id": ID}) + decodeErr := sr.Decode(thProfile) + if errors.Is(decodeErr, mongo.ErrNoDocuments) { + return utils.ErrNotFound } - return nil + return decodeErr }) - return + return thProfile, err } // SetThresholdProfileDrv stores a ThresholdProfile into DataDB -func (ms *MongoStorage) SetThresholdProfileDrv(tp *ThresholdProfile) (err error) { +func (ms *MongoStorage) SetThresholdProfileDrv(tp *ThresholdProfile) error { return ms.query(func(sctx mongo.SessionContext) (err error) { _, err = ms.getCol(ColTps).UpdateOne(sctx, bson.M{"tenant": tp.Tenant, "id": tp.ID}, bson.M{"$set": tp}, options.Update().SetUpsert(true), @@ -1576,8 +1542,8 @@ func (ms *MongoStorage) SetThresholdProfileDrv(tp *ThresholdProfile) (err error) } // RemoveThresholdProfile removes a ThresholdProfile from dataDB/cache -func (ms *MongoStorage) RemThresholdProfileDrv(tenant, id string) (err error) { - return ms.query(func(sctx mongo.SessionContext) (err error) { +func (ms *MongoStorage) RemThresholdProfileDrv(tenant, id string) error { + return ms.query(func(sctx mongo.SessionContext) error { dr, err := ms.getCol(ColTps).DeleteOne(sctx, bson.M{"tenant": tenant, "id": id}) if dr.DeletedCount == 0 { return utils.ErrNotFound @@ -1586,25 +1552,22 @@ func (ms *MongoStorage) RemThresholdProfileDrv(tenant, id string) (err error) { }) } -func (ms *MongoStorage) GetThresholdDrv(tenant, id string) (r *Threshold, err error) { - r = new(Threshold) - err = ms.query(func(sctx mongo.SessionContext) (err error) { - cur := ms.getCol(ColThs).FindOne(sctx, bson.M{"tenant": tenant, "id": id}) - if err := cur.Decode(r); err != nil { - r = nil - if err == mongo.ErrNoDocuments { - return utils.ErrNotFound - } - return err +func (ms *MongoStorage) GetThresholdDrv(tenant, id string) (*Threshold, error) { + th := new(Threshold) + err := ms.query(func(sctx mongo.SessionContext) error { + sr := ms.getCol(ColThs).FindOne(sctx, bson.M{"tenant": tenant, "id": id}) + decodeErr := sr.Decode(th) + if errors.Is(decodeErr, mongo.ErrNoDocuments) { + return utils.ErrNotFound } - return nil + return decodeErr }) - return + return th, err } -func (ms *MongoStorage) SetThresholdDrv(r *Threshold) (err error) { - return ms.query(func(sctx mongo.SessionContext) (err error) { - _, err = ms.getCol(ColThs).UpdateOne(sctx, bson.M{"tenant": r.Tenant, "id": r.ID}, +func (ms *MongoStorage) SetThresholdDrv(r *Threshold) error { + return ms.query(func(sctx mongo.SessionContext) error { + _, err := ms.getCol(ColThs).UpdateOne(sctx, bson.M{"tenant": r.Tenant, "id": r.ID}, bson.M{"$set": r}, options.Update().SetUpsert(true), ) @@ -1612,8 +1575,8 @@ func (ms *MongoStorage) SetThresholdDrv(r *Threshold) (err error) { }) } -func (ms *MongoStorage) RemoveThresholdDrv(tenant, id string) (err error) { - return ms.query(func(sctx mongo.SessionContext) (err error) { +func (ms *MongoStorage) RemoveThresholdDrv(tenant, id string) error { + return ms.query(func(sctx mongo.SessionContext) error { dr, err := ms.getCol(ColThs).DeleteOne(sctx, bson.M{"tenant": tenant, "id": id}) if dr.DeletedCount == 0 { return utils.ErrNotFound @@ -1622,26 +1585,25 @@ func (ms *MongoStorage) RemoveThresholdDrv(tenant, id string) (err error) { }) } -func (ms *MongoStorage) GetFilterDrv(tenant, id string) (r *Filter, err error) { - r = new(Filter) - if err = ms.query(func(sctx mongo.SessionContext) (err error) { - cur := ms.getCol(ColFlt).FindOne(sctx, bson.M{"tenant": tenant, "id": id}) - if err := cur.Decode(r); err != nil { - if err == mongo.ErrNoDocuments { - return utils.ErrNotFound - } - return err +func (ms *MongoStorage) GetFilterDrv(tenant, id string) (*Filter, error) { + fltr := new(Filter) + err := ms.query(func(sctx mongo.SessionContext) error { + sr := ms.getCol(ColFlt).FindOne(sctx, bson.M{"tenant": tenant, "id": id}) + decodeErr := sr.Decode(fltr) + if errors.Is(decodeErr, mongo.ErrNoDocuments) { + return utils.ErrNotFound } - return nil - }); err != nil { + return decodeErr + }) + if err != nil { return nil, err } - return + return fltr, err } -func (ms *MongoStorage) SetFilterDrv(r *Filter) (err error) { - return ms.query(func(sctx mongo.SessionContext) (err error) { - _, err = ms.getCol(ColFlt).UpdateOne(sctx, bson.M{"tenant": r.Tenant, "id": r.ID}, +func (ms *MongoStorage) SetFilterDrv(r *Filter) error { + return ms.query(func(sctx mongo.SessionContext) error { + _, err := ms.getCol(ColFlt).UpdateOne(sctx, bson.M{"tenant": r.Tenant, "id": r.ID}, bson.M{"$set": r}, options.Update().SetUpsert(true), ) @@ -1649,8 +1611,8 @@ func (ms *MongoStorage) SetFilterDrv(r *Filter) (err error) { }) } -func (ms *MongoStorage) RemoveFilterDrv(tenant, id string) (err error) { - return ms.query(func(sctx mongo.SessionContext) (err error) { +func (ms *MongoStorage) RemoveFilterDrv(tenant, id string) error { + return ms.query(func(sctx mongo.SessionContext) error { dr, err := ms.getCol(ColFlt).DeleteOne(sctx, bson.M{"tenant": tenant, "id": id}) if dr.DeletedCount == 0 { return utils.ErrNotFound @@ -1659,25 +1621,22 @@ func (ms *MongoStorage) RemoveFilterDrv(tenant, id string) (err error) { }) } -func (ms *MongoStorage) GetRouteProfileDrv(tenant, id string) (r *RouteProfile, err error) { - r = new(RouteProfile) - err = ms.query(func(sctx mongo.SessionContext) (err error) { - cur := ms.getCol(ColRts).FindOne(sctx, bson.M{"tenant": tenant, "id": id}) - if err := cur.Decode(r); err != nil { - r = nil - if err == mongo.ErrNoDocuments { - return utils.ErrNotFound - } - return err +func (ms *MongoStorage) GetRouteProfileDrv(tenant, id string) (*RouteProfile, error) { + routeProfile := new(RouteProfile) + err := ms.query(func(sctx mongo.SessionContext) error { + sr := ms.getCol(ColRts).FindOne(sctx, bson.M{"tenant": tenant, "id": id}) + decodeErr := sr.Decode(routeProfile) + if errors.Is(decodeErr, mongo.ErrNoDocuments) { + return utils.ErrNotFound } - return nil + return decodeErr }) - return + return routeProfile, err } -func (ms *MongoStorage) SetRouteProfileDrv(r *RouteProfile) (err error) { - return ms.query(func(sctx mongo.SessionContext) (err error) { - _, err = ms.getCol(ColRts).UpdateOne(sctx, bson.M{"tenant": r.Tenant, "id": r.ID}, +func (ms *MongoStorage) SetRouteProfileDrv(r *RouteProfile) error { + return ms.query(func(sctx mongo.SessionContext) error { + _, err := ms.getCol(ColRts).UpdateOne(sctx, bson.M{"tenant": r.Tenant, "id": r.ID}, bson.M{"$set": r}, options.Update().SetUpsert(true), ) @@ -1685,8 +1644,8 @@ func (ms *MongoStorage) SetRouteProfileDrv(r *RouteProfile) (err error) { }) } -func (ms *MongoStorage) RemoveRouteProfileDrv(tenant, id string) (err error) { - return ms.query(func(sctx mongo.SessionContext) (err error) { +func (ms *MongoStorage) RemoveRouteProfileDrv(tenant, id string) error { + return ms.query(func(sctx mongo.SessionContext) error { dr, err := ms.getCol(ColRts).DeleteOne(sctx, bson.M{"tenant": tenant, "id": id}) if dr.DeletedCount == 0 { return utils.ErrNotFound @@ -1695,25 +1654,22 @@ func (ms *MongoStorage) RemoveRouteProfileDrv(tenant, id string) (err error) { }) } -func (ms *MongoStorage) GetAttributeProfileDrv(tenant, id string) (r *AttributeProfile, err error) { - r = new(AttributeProfile) - err = ms.query(func(sctx mongo.SessionContext) (err error) { - cur := ms.getCol(ColAttr).FindOne(sctx, bson.M{"tenant": tenant, "id": id}) - if err := cur.Decode(r); err != nil { - r = nil - if err == mongo.ErrNoDocuments { - return utils.ErrNotFound - } - return err +func (ms *MongoStorage) GetAttributeProfileDrv(tenant, id string) (*AttributeProfile, error) { + attrProfile := new(AttributeProfile) + err := ms.query(func(sctx mongo.SessionContext) error { + sr := ms.getCol(ColAttr).FindOne(sctx, bson.M{"tenant": tenant, "id": id}) + decodeErr := sr.Decode(attrProfile) + if errors.Is(decodeErr, mongo.ErrNoDocuments) { + return utils.ErrNotFound } - return nil + return decodeErr }) - return + return attrProfile, err } -func (ms *MongoStorage) SetAttributeProfileDrv(r *AttributeProfile) (err error) { - return ms.query(func(sctx mongo.SessionContext) (err error) { - _, err = ms.getCol(ColAttr).UpdateOne(sctx, bson.M{"tenant": r.Tenant, "id": r.ID}, +func (ms *MongoStorage) SetAttributeProfileDrv(r *AttributeProfile) error { + return ms.query(func(sctx mongo.SessionContext) error { + _, err := ms.getCol(ColAttr).UpdateOne(sctx, bson.M{"tenant": r.Tenant, "id": r.ID}, bson.M{"$set": r}, options.Update().SetUpsert(true), ) @@ -1721,8 +1677,8 @@ func (ms *MongoStorage) SetAttributeProfileDrv(r *AttributeProfile) (err error) }) } -func (ms *MongoStorage) RemoveAttributeProfileDrv(tenant, id string) (err error) { - return ms.query(func(sctx mongo.SessionContext) (err error) { +func (ms *MongoStorage) RemoveAttributeProfileDrv(tenant, id string) error { + return ms.query(func(sctx mongo.SessionContext) error { dr, err := ms.getCol(ColAttr).DeleteOne(sctx, bson.M{"tenant": tenant, "id": id}) if dr.DeletedCount == 0 { return utils.ErrNotFound @@ -1731,25 +1687,22 @@ func (ms *MongoStorage) RemoveAttributeProfileDrv(tenant, id string) (err error) }) } -func (ms *MongoStorage) GetChargerProfileDrv(tenant, id string) (r *ChargerProfile, err error) { - r = new(ChargerProfile) - err = ms.query(func(sctx mongo.SessionContext) (err error) { - cur := ms.getCol(ColCpp).FindOne(sctx, bson.M{"tenant": tenant, "id": id}) - if err := cur.Decode(r); err != nil { - r = nil - if err == mongo.ErrNoDocuments { - return utils.ErrNotFound - } - return err +func (ms *MongoStorage) GetChargerProfileDrv(tenant, id string) (*ChargerProfile, error) { + chargerProfile := new(ChargerProfile) + err := ms.query(func(sctx mongo.SessionContext) error { + sr := ms.getCol(ColCpp).FindOne(sctx, bson.M{"tenant": tenant, "id": id}) + decodeErr := sr.Decode(chargerProfile) + if errors.Is(decodeErr, mongo.ErrNoDocuments) { + return utils.ErrNotFound } - return nil + return decodeErr }) - return + return chargerProfile, err } -func (ms *MongoStorage) SetChargerProfileDrv(r *ChargerProfile) (err error) { - return ms.query(func(sctx mongo.SessionContext) (err error) { - _, err = ms.getCol(ColCpp).UpdateOne(sctx, bson.M{"tenant": r.Tenant, "id": r.ID}, +func (ms *MongoStorage) SetChargerProfileDrv(r *ChargerProfile) error { + return ms.query(func(sctx mongo.SessionContext) error { + _, err := ms.getCol(ColCpp).UpdateOne(sctx, bson.M{"tenant": r.Tenant, "id": r.ID}, bson.M{"$set": r}, options.Update().SetUpsert(true), ) @@ -1757,8 +1710,8 @@ func (ms *MongoStorage) SetChargerProfileDrv(r *ChargerProfile) (err error) { }) } -func (ms *MongoStorage) RemoveChargerProfileDrv(tenant, id string) (err error) { - return ms.query(func(sctx mongo.SessionContext) (err error) { +func (ms *MongoStorage) RemoveChargerProfileDrv(tenant, id string) error { + return ms.query(func(sctx mongo.SessionContext) error { dr, err := ms.getCol(ColCpp).DeleteOne(sctx, bson.M{"tenant": tenant, "id": id}) if dr.DeletedCount == 0 { return utils.ErrNotFound @@ -1767,25 +1720,22 @@ func (ms *MongoStorage) RemoveChargerProfileDrv(tenant, id string) (err error) { }) } -func (ms *MongoStorage) GetDispatcherProfileDrv(tenant, id string) (r *DispatcherProfile, err error) { - r = new(DispatcherProfile) - err = ms.query(func(sctx mongo.SessionContext) (err error) { - cur := ms.getCol(ColDpp).FindOne(sctx, bson.M{"tenant": tenant, "id": id}) - if err := cur.Decode(r); err != nil { - r = nil - if err == mongo.ErrNoDocuments { - return utils.ErrDSPProfileNotFound - } - return err +func (ms *MongoStorage) GetDispatcherProfileDrv(tenant, id string) (*DispatcherProfile, error) { + dspProfile := new(DispatcherProfile) + err := ms.query(func(sctx mongo.SessionContext) error { + sr := ms.getCol(ColDpp).FindOne(sctx, bson.M{"tenant": tenant, "id": id}) + decodeErr := sr.Decode(dspProfile) + if errors.Is(decodeErr, mongo.ErrNoDocuments) { + return utils.ErrDSPProfileNotFound } - return nil + return decodeErr }) - return + return dspProfile, err } -func (ms *MongoStorage) SetDispatcherProfileDrv(r *DispatcherProfile) (err error) { - return ms.query(func(sctx mongo.SessionContext) (err error) { - _, err = ms.getCol(ColDpp).UpdateOne(sctx, bson.M{"tenant": r.Tenant, "id": r.ID}, +func (ms *MongoStorage) SetDispatcherProfileDrv(r *DispatcherProfile) error { + return ms.query(func(sctx mongo.SessionContext) error { + _, err := ms.getCol(ColDpp).UpdateOne(sctx, bson.M{"tenant": r.Tenant, "id": r.ID}, bson.M{"$set": r}, options.Update().SetUpsert(true), ) @@ -1793,8 +1743,8 @@ func (ms *MongoStorage) SetDispatcherProfileDrv(r *DispatcherProfile) (err error }) } -func (ms *MongoStorage) RemoveDispatcherProfileDrv(tenant, id string) (err error) { - return ms.query(func(sctx mongo.SessionContext) (err error) { +func (ms *MongoStorage) RemoveDispatcherProfileDrv(tenant, id string) error { + return ms.query(func(sctx mongo.SessionContext) error { dr, err := ms.getCol(ColDpp).DeleteOne(sctx, bson.M{"tenant": tenant, "id": id}) if dr.DeletedCount == 0 { return utils.ErrNotFound @@ -1803,25 +1753,22 @@ func (ms *MongoStorage) RemoveDispatcherProfileDrv(tenant, id string) (err error }) } -func (ms *MongoStorage) GetDispatcherHostDrv(tenant, id string) (r *DispatcherHost, err error) { - r = new(DispatcherHost) - err = ms.query(func(sctx mongo.SessionContext) (err error) { - cur := ms.getCol(ColDph).FindOne(sctx, bson.M{"tenant": tenant, "id": id}) - if err := cur.Decode(r); err != nil { - r = nil - if err == mongo.ErrNoDocuments { - return utils.ErrDSPHostNotFound - } - return err +func (ms *MongoStorage) GetDispatcherHostDrv(tenant, id string) (*DispatcherHost, error) { + dspHost := new(DispatcherHost) + err := ms.query(func(sctx mongo.SessionContext) error { + sr := ms.getCol(ColDph).FindOne(sctx, bson.M{"tenant": tenant, "id": id}) + decodeErr := sr.Decode(dspHost) + if errors.Is(decodeErr, mongo.ErrNoDocuments) { + return utils.ErrDSPHostNotFound } - return nil + return decodeErr }) - return + return dspHost, err } -func (ms *MongoStorage) SetDispatcherHostDrv(r *DispatcherHost) (err error) { - return ms.query(func(sctx mongo.SessionContext) (err error) { - _, err = ms.getCol(ColDph).UpdateOne(sctx, bson.M{"tenant": r.Tenant, "id": r.ID}, +func (ms *MongoStorage) SetDispatcherHostDrv(r *DispatcherHost) error { + return ms.query(func(sctx mongo.SessionContext) error { + _, err := ms.getCol(ColDph).UpdateOne(sctx, bson.M{"tenant": r.Tenant, "id": r.ID}, bson.M{"$set": r}, options.Update().SetUpsert(true), ) @@ -1829,8 +1776,8 @@ func (ms *MongoStorage) SetDispatcherHostDrv(r *DispatcherHost) (err error) { }) } -func (ms *MongoStorage) RemoveDispatcherHostDrv(tenant, id string) (err error) { - return ms.query(func(sctx mongo.SessionContext) (err error) { +func (ms *MongoStorage) RemoveDispatcherHostDrv(tenant, id string) error { + return ms.query(func(sctx mongo.SessionContext) error { dr, err := ms.getCol(ColDph).DeleteOne(sctx, bson.M{"tenant": tenant, "id": id}) if dr.DeletedCount == 0 { return utils.ErrNotFound @@ -1839,43 +1786,43 @@ func (ms *MongoStorage) RemoveDispatcherHostDrv(tenant, id string) (err error) { }) } -func (ms *MongoStorage) GetItemLoadIDsDrv(itemIDPrefix string) (loadIDs map[string]int64, err error) { +func (ms *MongoStorage) GetItemLoadIDsDrv(itemIDPrefix string) (map[string]int64, error) { fop := options.FindOne() if itemIDPrefix != "" { fop.SetProjection(bson.M{itemIDPrefix: 1, "_id": 0}) } else { fop.SetProjection(bson.M{"_id": 0}) } - if err = ms.query(func(sctx mongo.SessionContext) (err error) { - cur := ms.getCol(ColLID).FindOne(sctx, bson.D{}, fop) - if err := cur.Decode(&loadIDs); err != nil { - if err == mongo.ErrNoDocuments { - return utils.ErrNotFound - } - return err + loadIDs := make(map[string]int64) + err := ms.query(func(sctx mongo.SessionContext) error { + sr := ms.getCol(ColLID).FindOne(sctx, bson.D{}, fop) + decodeErr := sr.Decode(&loadIDs) + if errors.Is(decodeErr, mongo.ErrNoDocuments) { + return utils.ErrNotFound } - return nil - }); err != nil { + return decodeErr + }) + if err != nil { return nil, err } if len(loadIDs) == 0 { return nil, utils.ErrNotFound } - return + return loadIDs, nil } -func (ms *MongoStorage) SetLoadIDsDrv(loadIDs map[string]int64) (err error) { - return ms.query(func(sctx mongo.SessionContext) (err error) { - _, err = ms.getCol(ColLID).UpdateOne(sctx, bson.D{}, bson.M{"$set": loadIDs}, +func (ms *MongoStorage) SetLoadIDsDrv(loadIDs map[string]int64) error { + return ms.query(func(sctx mongo.SessionContext) error { + _, err := ms.getCol(ColLID).UpdateOne(sctx, bson.D{}, bson.M{"$set": loadIDs}, options.Update().SetUpsert(true), ) return err }) } -func (ms *MongoStorage) RemoveLoadIDsDrv() (err error) { - return ms.query(func(sctx mongo.SessionContext) (err error) { - _, err = ms.getCol(ColLID).DeleteMany(sctx, bson.M{}) +func (ms *MongoStorage) RemoveLoadIDsDrv() error { + return ms.query(func(sctx mongo.SessionContext) error { + _, err := ms.getCol(ColLID).DeleteMany(sctx, bson.M{}) return err }) } @@ -1883,7 +1830,7 @@ func (ms *MongoStorage) RemoveLoadIDsDrv() (err error) { // GetIndexesDrv retrieves Indexes from dataDB // the key is the tenant of the item or in case of context dependent profiles is a concatenatedKey between tenant and context // id is used as a concatenated key in case of filterIndexes the id will be filterType:fieldName:fieldVal -func (ms *MongoStorage) GetIndexesDrv(idxItmType, tntCtx, idxKey string) (indexes map[string]utils.StringSet, err error) { +func (ms *MongoStorage) GetIndexesDrv(idxItmType, tntCtx, idxKey string) (map[string]utils.StringSet, error) { type result struct { Key string Value []string @@ -1896,19 +1843,26 @@ func (ms *MongoStorage) GetIndexesDrv(idxItmType, tntCtx, idxKey string) (indexe for _, character := range []string{".", "*"} { dbKey = strings.Replace(dbKey, character, `\`+character, strings.Count(dbKey, character)) } - //inside bson.RegEx add carrot to match the prefix (optimization) - q = bson.M{"key": bsonx.Regex("^"+dbKey, utils.EmptyString)} + // For optimization, use a caret (^) in the regex pattern. + q = bson.M{"key": primitive.Regex{Pattern: "^" + dbKey}} } - indexes = make(map[string]utils.StringSet) - if err = ms.query(func(sctx mongo.SessionContext) (err error) { - cur, err := ms.getCol(ColIndx).Find(sctx, q) - if err != nil { - return err + indexes := make(map[string]utils.StringSet) + err := ms.query(func(sctx mongo.SessionContext) (qryErr error) { + cur, qryErr := ms.getCol(ColIndx).Find(sctx, q) + if qryErr != nil { + return qryErr } + defer func() { + closeErr := cur.Close(sctx) + if closeErr != nil && qryErr == nil { + qryErr = closeErr + } + }() for cur.Next(sctx) { var elem result - if err := cur.Decode(&elem); err != nil { - return err + qryErr = cur.Decode(&elem) + if qryErr != nil { + return qryErr } if len(elem.Value) == 0 { continue @@ -1916,8 +1870,9 @@ func (ms *MongoStorage) GetIndexesDrv(idxItmType, tntCtx, idxKey string) (indexe indexKey := strings.TrimPrefix(elem.Key, utils.CacheInstanceToPrefix[idxItmType]+tntCtx+utils.ConcatenatedKeySep) indexes[indexKey] = utils.NewStringSet(elem.Value) } - return cur.Close(sctx) - }); err != nil { + return cur.Err() + }) + if err != nil { return nil, err } if len(indexes) == 0 { @@ -1929,7 +1884,7 @@ func (ms *MongoStorage) GetIndexesDrv(idxItmType, tntCtx, idxKey string) (indexe // SetIndexesDrv stores Indexes into DataDB // the key is the tenant of the item or in case of context dependent profiles is a concatenatedKey between tenant and context func (ms *MongoStorage) SetIndexesDrv(idxItmType, tntCtx string, - indexes map[string]utils.StringSet, commit bool, transactionID string) (err error) { + indexes map[string]utils.StringSet, commit bool, transactionID string) error { originKey := utils.CacheInstanceToPrefix[idxItmType] + tntCtx dbKey := originKey if transactionID != utils.EmptyString { @@ -1938,43 +1893,44 @@ func (ms *MongoStorage) SetIndexesDrv(idxItmType, tntCtx string, if commit && transactionID != utils.EmptyString { regexKey := dbKey for _, character := range []string{".", "*"} { - regexKey = strings.Replace(regexKey, character, `\`+character, strings.Count(regexKey, character)) + regexKey = strings.ReplaceAll(regexKey, character, `\`+character) } - if err = ms.query(func(sctx mongo.SessionContext) (err error) { - var result []string - result, err = ms.getField3(sctx, ColIndx, regexKey, "key") + err := ms.query(func(sctx mongo.SessionContext) error { + result, qryErr := ms.getAllIndexKeys(sctx, regexKey) for _, key := range result { idxKey := strings.TrimPrefix(key, dbKey) - if _, err = ms.getCol(ColIndx).DeleteOne(sctx, - bson.M{"key": originKey + idxKey}); err != nil { //ensure we do not have dup - return err + if _, qryErr = ms.getCol(ColIndx).DeleteOne(sctx, + bson.M{"key": originKey + idxKey}); qryErr != nil { //ensure we do not have dup + return qryErr } - if _, err = ms.getCol(ColIndx).UpdateOne(sctx, bson.M{"key": key}, + if _, qryErr = ms.getCol(ColIndx).UpdateOne(sctx, bson.M{"key": key}, bson.M{"$set": bson.M{"key": originKey + idxKey}}, // only update the key - ); err != nil { - return err + ); qryErr != nil { + return qryErr } } return nil - }); err != nil { + }) + if err != nil { return err } } var lastErr error for idxKey, itmMp := range indexes { - if err = ms.query(func(sctx mongo.SessionContext) (err error) { + err := ms.query(func(sctx mongo.SessionContext) (qryErr error) { idxDbkey := utils.ConcatenatedKey(dbKey, idxKey) if len(itmMp) == 0 { // remove from DB if we set it with empty indexes - _, err = ms.getCol(ColIndx).DeleteOne(sctx, + _, qryErr = ms.getCol(ColIndx).DeleteOne(sctx, bson.M{"key": idxDbkey}) } else { - _, err = ms.getCol(ColIndx).UpdateOne(sctx, bson.M{"key": idxDbkey}, + _, qryErr = ms.getCol(ColIndx).UpdateOne(sctx, bson.M{"key": idxDbkey}, bson.M{"$set": bson.M{"key": idxDbkey, "value": itmMp.AsSlice()}}, options.Update().SetUpsert(true), ) } - return err - }); err != nil { + return qryErr + }) + if err != nil { lastErr = err } } @@ -1982,9 +1938,9 @@ func (ms *MongoStorage) SetIndexesDrv(idxItmType, tntCtx string, } // RemoveIndexesDrv removes the indexes -func (ms *MongoStorage) RemoveIndexesDrv(idxItmType, tntCtx, idxKey string) (err error) { +func (ms *MongoStorage) RemoveIndexesDrv(idxItmType, tntCtx, idxKey string) error { if len(idxKey) != 0 { - return ms.query(func(sctx mongo.SessionContext) (err error) { + return ms.query(func(sctx mongo.SessionContext) error { dr, err := ms.getCol(ColIndx).DeleteOne(sctx, bson.M{"key": utils.ConcatenatedKey(utils.CacheInstanceToPrefix[idxItmType]+tntCtx, idxKey)}) if dr.DeletedCount == 0 { @@ -1995,11 +1951,15 @@ func (ms *MongoStorage) RemoveIndexesDrv(idxItmType, tntCtx, idxKey string) (err } regexKey := utils.CacheInstanceToPrefix[idxItmType] + tntCtx for _, character := range []string{".", "*"} { - regexKey = strings.Replace(regexKey, character, `\`+character, strings.Count(regexKey, character)) + regexKey = strings.ReplaceAll(regexKey, character, `\`+character) } - //inside bson.RegEx add carrot to match the prefix (optimization) - return ms.query(func(sctx mongo.SessionContext) (err error) { - _, err = ms.getCol(ColIndx).DeleteMany(sctx, bson.M{"key": bsonx.Regex("^"+regexKey, utils.EmptyString)}) + // For optimization, use a caret (^) in the regex pattern. + return ms.query(func(sctx mongo.SessionContext) error { + _, err := ms.getCol(ColIndx).DeleteMany(sctx, bson.M{ + "key": primitive.Regex{ + Pattern: "^" + regexKey, + }, + }) return err }) } diff --git a/engine/storage_mongo_stordb.go b/engine/storage_mongo_stordb.go index 9e37f167a..02309c0ac 100644 --- a/engine/storage_mongo_stordb.go +++ b/engine/storage_mongo_stordb.go @@ -20,6 +20,7 @@ package engine import ( "context" + "errors" "fmt" "regexp" "strings" @@ -28,9 +29,9 @@ import ( "github.com/cgrates/cgrates/utils" "go.mongodb.org/mongo-driver/bson" + "go.mongodb.org/mongo-driver/bson/primitive" "go.mongodb.org/mongo-driver/mongo" "go.mongodb.org/mongo-driver/mongo/options" - "go.mongodb.org/mongo-driver/x/bsonx" ) func (ms *MongoStorage) GetTpIds(colName string) (tpids []string, err error) { @@ -79,7 +80,7 @@ func (ms *MongoStorage) GetTpIds(colName string) (tpids []string, err error) { return tpids, nil } -func (ms *MongoStorage) GetTpTableIds(tpid, table string, distinct utils.TPDistinctIds, +func (ms *MongoStorage) GetTpTableIds(tpid, table string, distinctIDs utils.TPDistinctIds, filter map[string]string, pag *utils.PaginatorWithSearch) ([]string, error) { findMap := bson.M{} if tpid != "" { @@ -95,11 +96,16 @@ func (ms *MongoStorage) GetTpTableIds(tpid, table string, distinct utils.TPDisti if pag != nil { if pag.Search != "" { var searchItems []bson.M - for _, d := range distinct { - searchItems = append(searchItems, bson.M{d: bsonx.Regex(".*"+regexp.QuoteMeta(pag.Search)+".*", "")}) + for _, distinctID := range distinctIDs { + searchItems = append(searchItems, + bson.M{ + distinctID: primitive.Regex{ + Pattern: ".*" + regexp.QuoteMeta(pag.Search) + ".*", + }, + }, + ) } - // findMap["$and"] = []bson.M{{"$or": searchItems}} //before - findMap["$or"] = searchItems // after + findMap["$or"] = searchItems } if pag.Paginator != nil { if pag.Limit != nil { @@ -112,32 +118,31 @@ func (ms *MongoStorage) GetTpTableIds(tpid, table string, distinct utils.TPDisti } selectors := bson.M{"_id": 0} - for i, d := range distinct { - if d == "tag" { // convert the tag used in SQL into id used here - distinct[i] = "id" + for i, distinctID := range distinctIDs { + if distinctID == "tag" { // convert the tag used in SQL into id used here + distinctIDs[i] = "id" } - selectors[distinct[i]] = 1 + selectors[distinctIDs[i]] = 1 } fop.SetProjection(selectors) distinctIds := make(utils.StringMap) - if err := ms.query(func(sctx mongo.SessionContext) (err error) { - cur, err := ms.getCol(table).Find(sctx, findMap, fop) - if err != nil { - return err + err := ms.query(func(sctx mongo.SessionContext) (qryErr error) { + cur, qryErr := ms.getCol(table).Find(sctx, findMap, fop) + if qryErr != nil { + return qryErr } for cur.Next(sctx) { - var elem bson.D - err := cur.Decode(&elem) + var item bson.M + err := cur.Decode(&item) if err != nil { return err } - item := elem.Map() var id string - last := len(distinct) - 1 - for i, d := range distinct { - if distinctValue, ok := item[d]; ok { + last := len(distinctIDs) - 1 + for i, distinctID := range distinctIDs { + if distinctValue, ok := item[distinctID]; ok { id += distinctValue.(string) } if i < last { @@ -147,7 +152,8 @@ func (ms *MongoStorage) GetTpTableIds(tpid, table string, distinct utils.TPDisti distinctIds[id] = true } return cur.Close(sctx) - }); err != nil { + }) + if err != nil { return nil, err } return distinctIds.Slice(), nil @@ -158,26 +164,26 @@ func (ms *MongoStorage) GetTPTimings(tpid, id string) ([]*utils.ApierTPTiming, e if id != "" { filter["id"] = id } - var results []*utils.ApierTPTiming - err := ms.query(func(sctx mongo.SessionContext) (err error) { - cur, err := ms.getCol(utils.TBLTPTimings).Find(sctx, filter) - if err != nil { - return err + var tpTimings []*utils.ApierTPTiming + err := ms.query(func(sctx mongo.SessionContext) (qryErr error) { + cur, qryErr := ms.getCol(utils.TBLTPTimings).Find(sctx, filter) + if qryErr != nil { + return qryErr } for cur.Next(sctx) { var el utils.ApierTPTiming - err := cur.Decode(&el) - if err != nil { - return err + qryErr = cur.Decode(&el) + if qryErr != nil { + return qryErr } - results = append(results, &el) + tpTimings = append(tpTimings, &el) } - if len(results) == 0 { + if len(tpTimings) == 0 { return utils.ErrNotFound } return cur.Close(sctx) }) - return results, err + return tpTimings, err } func (ms *MongoStorage) GetTPDestinations(tpid, id string) ([]*utils.TPDestination, error) { @@ -185,26 +191,26 @@ func (ms *MongoStorage) GetTPDestinations(tpid, id string) ([]*utils.TPDestinati if id != "" { filter["id"] = id } - var results []*utils.TPDestination - err := ms.query(func(sctx mongo.SessionContext) (err error) { - cur, err := ms.getCol(utils.TBLTPDestinations).Find(sctx, filter) - if err != nil { - return err + var tpDestinations []*utils.TPDestination + err := ms.query(func(sctx mongo.SessionContext) (qryErr error) { + cur, qryErr := ms.getCol(utils.TBLTPDestinations).Find(sctx, filter) + if qryErr != nil { + return qryErr } for cur.Next(sctx) { var el utils.TPDestination - err := cur.Decode(&el) - if err != nil { - return err + qryErr = cur.Decode(&el) + if qryErr != nil { + return qryErr } - results = append(results, &el) + tpDestinations = append(tpDestinations, &el) } - if len(results) == 0 { + if len(tpDestinations) == 0 { return utils.ErrNotFound } return cur.Close(sctx) }) - return results, err + return tpDestinations, err } func (ms *MongoStorage) GetTPRates(tpid, id string) ([]*utils.TPRateRALs, error) { @@ -212,11 +218,11 @@ func (ms *MongoStorage) GetTPRates(tpid, id string) ([]*utils.TPRateRALs, error) if id != "" { filter["id"] = id } - var results []*utils.TPRateRALs - err := ms.query(func(sctx mongo.SessionContext) (err error) { - cur, err := ms.getCol(utils.TBLTPRates).Find(sctx, filter) - if err != nil { - return err + var tpRates []*utils.TPRateRALs + err := ms.query(func(sctx mongo.SessionContext) (qryErr error) { + cur, qryErr := ms.getCol(utils.TBLTPRates).Find(sctx, filter) + if qryErr != nil { + return qryErr } for cur.Next(sctx) { var el utils.TPRateRALs @@ -227,14 +233,14 @@ func (ms *MongoStorage) GetTPRates(tpid, id string) ([]*utils.TPRateRALs, error) for _, rs := range el.RateSlots { rs.SetDurations() } - results = append(results, &el) + tpRates = append(tpRates, &el) } - if len(results) == 0 { + if len(tpRates) == 0 { return utils.ErrNotFound } return cur.Close(sctx) }) - return results, err + return tpRates, err } func (ms *MongoStorage) GetTPDestinationRates(tpid, id string, pag *utils.Paginator) ([]*utils.TPDestinationRate, error) { @@ -242,7 +248,7 @@ func (ms *MongoStorage) GetTPDestinationRates(tpid, id string, pag *utils.Pagina if id != "" { filter["id"] = id } - var results []*utils.TPDestinationRate + var tpDstRates []*utils.TPDestinationRate fop := options.Find() if pag != nil { if pag.Limit != nil { @@ -252,25 +258,25 @@ func (ms *MongoStorage) GetTPDestinationRates(tpid, id string, pag *utils.Pagina fop = fop.SetSkip(int64(*pag.Offset)) } } - err := ms.query(func(sctx mongo.SessionContext) (err error) { - cur, err := ms.getCol(utils.TBLTPDestinationRates).Find(sctx, filter, fop) - if err != nil { - return err + err := ms.query(func(sctx mongo.SessionContext) (qryErr error) { + cur, qryErr := ms.getCol(utils.TBLTPDestinationRates).Find(sctx, filter, fop) + if qryErr != nil { + return qryErr } for cur.Next(sctx) { var el utils.TPDestinationRate - err := cur.Decode(&el) - if err != nil { - return err + qryErr = cur.Decode(&el) + if qryErr != nil { + return qryErr } - results = append(results, &el) + tpDstRates = append(tpDstRates, &el) } - if len(results) == 0 { + if len(tpDstRates) == 0 { return utils.ErrNotFound } return cur.Close(sctx) }) - return results, err + return tpDstRates, err } func (ms *MongoStorage) GetTPRatingPlans(tpid, id string, pag *utils.Paginator) ([]*utils.TPRatingPlan, error) { @@ -278,7 +284,7 @@ func (ms *MongoStorage) GetTPRatingPlans(tpid, id string, pag *utils.Paginator) if id != "" { filter["id"] = id } - var results []*utils.TPRatingPlan + var tpRatingPlans []*utils.TPRatingPlan fop := options.Find() if pag != nil { if pag.Limit != nil { @@ -288,25 +294,25 @@ func (ms *MongoStorage) GetTPRatingPlans(tpid, id string, pag *utils.Paginator) fop = fop.SetSkip(int64(*pag.Offset)) } } - err := ms.query(func(sctx mongo.SessionContext) (err error) { - cur, err := ms.getCol(utils.TBLTPRatingPlans).Find(sctx, filter, fop) - if err != nil { - return err + err := ms.query(func(sctx mongo.SessionContext) (qryErr error) { + cur, qryErr := ms.getCol(utils.TBLTPRatingPlans).Find(sctx, filter, fop) + if qryErr != nil { + return qryErr } for cur.Next(sctx) { var el utils.TPRatingPlan - err := cur.Decode(&el) - if err != nil { - return err + qryErr = cur.Decode(&el) + if qryErr != nil { + return qryErr } - results = append(results, &el) + tpRatingPlans = append(tpRatingPlans, &el) } - if len(results) == 0 { + if len(tpRatingPlans) == 0 { return utils.ErrNotFound } return cur.Close(sctx) }) - return results, err + return tpRatingPlans, err } func (ms *MongoStorage) GetTPRatingProfiles(tp *utils.TPRatingProfile) ([]*utils.TPRatingProfile, error) { @@ -323,26 +329,26 @@ func (ms *MongoStorage) GetTPRatingProfiles(tp *utils.TPRatingProfile) ([]*utils if tp.LoadId != "" { filter["loadid"] = tp.LoadId } - var results []*utils.TPRatingProfile - err := ms.query(func(sctx mongo.SessionContext) (err error) { - cur, err := ms.getCol(utils.TBLTPRatingProfiles).Find(sctx, filter) - if err != nil { - return err + var tpRatingProfiles []*utils.TPRatingProfile + err := ms.query(func(sctx mongo.SessionContext) (qryErr error) { + cur, qryErr := ms.getCol(utils.TBLTPRatingProfiles).Find(sctx, filter) + if qryErr != nil { + return qryErr } for cur.Next(sctx) { var el utils.TPRatingProfile - err := cur.Decode(&el) - if err != nil { - return err + qryErr = cur.Decode(&el) + if qryErr != nil { + return qryErr } - results = append(results, &el) + tpRatingProfiles = append(tpRatingProfiles, &el) } - if len(results) == 0 { + if len(tpRatingProfiles) == 0 { return utils.ErrNotFound } return cur.Close(sctx) }) - return results, err + return tpRatingProfiles, err } func (ms *MongoStorage) GetTPSharedGroups(tpid, id string) ([]*utils.TPSharedGroups, error) { @@ -551,7 +557,7 @@ func (ms *MongoStorage) GetTPAccountActions(tp *utils.TPAccountActions) ([]*util } func (ms *MongoStorage) RemTpData(table, tpid string, args map[string]string) error { - if len(table) == 0 { // Remove tpid out of all tables + if table == utils.EmptyString { // Remove tpid out of all tables return ms.query(func(sctx mongo.SessionContext) error { col, err := ms.DB().ListCollections(sctx, bson.D{}, options.ListCollections().SetNameOnly(true)) if err != nil { @@ -890,7 +896,9 @@ func (ms *MongoStorage) GetSMCosts(cgrid, runid, originHost, originIDPrefix stri filter[OriginHostLow] = originHost } if originIDPrefix != "" { - filter[OriginIDLow] = bsonx.Regex(fmt.Sprintf("^%s", originIDPrefix), "") + filter[OriginIDLow] = primitive.Regex{ + Pattern: "^" + originIDPrefix, + } } err = ms.query(func(sctx mongo.SessionContext) (err error) { cur, err := ms.getCol(utils.SessionCostsTBL).Find(sctx, filter) @@ -934,7 +942,7 @@ func (ms *MongoStorage) RemoveSMCosts(qryFltr *utils.SMCostFilter) error { func (ms *MongoStorage) SetCDR(cdr *CDR, allowUpdate bool) error { if cdr.OrderID == 0 { - cdr.OrderID = ms.cnter.Next() + cdr.OrderID = ms.counter.Next() } return ms.query(func(sctx mongo.SessionContext) (err error) { if allowUpdate { @@ -984,20 +992,22 @@ func (ms *MongoStorage) cleanEmptyFilters(filters bson.M) { } // _, err := col(ColCDRs).UpdateAll(bson.M{CGRIDLow: bson.M{"$in": cgrIds}}, bson.M{"$set": bson.M{"deleted_at": time.Now()}}) -func (ms *MongoStorage) GetCDRs(qryFltr *utils.CDRsFilter, remove bool) ([]*CDR, int64, error) { +func (ms *MongoStorage) GetCDRs(qryFltr *utils.CDRsFilter, remove bool) (cdrs []*CDR, n int64, err error) { var minUsage, maxUsage *time.Duration - if len(qryFltr.MinUsage) != 0 { - if parsed, err := utils.ParseDurationWithNanosecs(qryFltr.MinUsage); err != nil { + if qryFltr.MinUsage != utils.EmptyString { + parsedDur, err := utils.ParseDurationWithNanosecs(qryFltr.MinUsage) + if err != nil { return nil, 0, err } else { - minUsage = &parsed + minUsage = &parsedDur } } - if len(qryFltr.MaxUsage) != 0 { - if parsed, err := utils.ParseDurationWithNanosecs(qryFltr.MaxUsage); err != nil { + if qryFltr.MaxUsage != utils.EmptyString { + parsedDur, err := utils.ParseDurationWithNanosecs(qryFltr.MaxUsage) + if err != nil { return nil, 0, err } else { - maxUsage = &parsed + maxUsage = &parsedDur } } filters := bson.M{ @@ -1018,20 +1028,20 @@ func (ms *MongoStorage) GetCDRs(qryFltr *utils.CDRsFilter, remove bool) ([]*CDR, CreatedAtLow: bson.M{"$gte": qryFltr.CreatedAtStart, "$lt": qryFltr.CreatedAtEnd}, UpdatedAtLow: bson.M{"$gte": qryFltr.UpdatedAtStart, "$lt": qryFltr.UpdatedAtEnd}, UsageLow: bson.M{"$gte": minUsage, "$lt": maxUsage}, - //CostDetailsLow + "." + AccountLow: bson.M{"$in": qryFltr.RatedAccounts, "$nin": qryFltr.NotRatedAccounts}, - //CostDetailsLow + "." + SubjectLow: bson.M{"$in": qryFltr.RatedSubjects, "$nin": qryFltr.NotRatedSubjects}, + // CostDetailsLow + "." + AccountLow: bson.M{"$in": qryFltr.RatedAccounts, "$nin": qryFltr.NotRatedAccounts}, + // CostDetailsLow + "." + SubjectLow: bson.M{"$in": qryFltr.RatedSubjects, "$nin": qryFltr.NotRatedSubjects}, } - //file, _ := os.TempFile(os.TempDir(), "debug") - //file.WriteString(fmt.Sprintf("FILTER: %v\n", utils.ToIJSON(qryFltr))) - //file.WriteString(fmt.Sprintf("BEFORE: %v\n", utils.ToIJSON(filters))) + // file, _ := os.TempFile(os.TempDir(), "debug") + // file.WriteString(fmt.Sprintf("FILTER: %v\n", utils.ToIJSON(qryFltr))) + // file.WriteString(fmt.Sprintf("BEFORE: %v\n", utils.ToIJSON(filters))) ms.cleanEmptyFilters(filters) if len(qryFltr.DestinationPrefixes) != 0 { var regexpRule string for _, prefix := range qryFltr.DestinationPrefixes { - if len(prefix) == 0 { + if prefix == utils.EmptyString { continue } - if len(regexpRule) != 0 { + if regexpRule != utils.EmptyString { regexpRule += "|" } regexpRule += "^(" + regexp.QuoteMeta(prefix) + ")" @@ -1039,17 +1049,29 @@ func (ms *MongoStorage) GetCDRs(qryFltr *utils.CDRsFilter, remove bool) ([]*CDR, if _, hasIt := filters["$and"]; !hasIt { filters["$and"] = make([]bson.M, 0) } - filters["$and"] = append(filters["$and"].([]bson.M), bson.M{DestinationLow: bsonx.Regex(regexpRule, "")}) // $and gathers all rules not fitting top level query + // The "$and" operator is used to include additional query conditions that cannot be + // represented at the top level of the query. + filters["$and"] = append(filters["$and"].([]bson.M), bson.M{ + DestinationLow: primitive.Regex{ + Pattern: regexpRule, + }, + }) } if len(qryFltr.NotDestinationPrefixes) != 0 { if _, hasIt := filters["$and"]; !hasIt { filters["$and"] = make([]bson.M, 0) } for _, prefix := range qryFltr.NotDestinationPrefixes { - if len(prefix) == 0 { + if prefix == utils.EmptyString { continue } - filters["$and"] = append(filters["$and"].([]bson.M), bson.M{DestinationLow: bsonx.Regex("^(?!"+prefix+")", "")}) + filters["$and"] = append(filters["$and"].([]bson.M), + bson.M{ + DestinationLow: primitive.Regex{ + Pattern: "^(?!" + prefix + ")", + }, + }, + ) } } @@ -1094,19 +1116,18 @@ func (ms *MongoStorage) GetCDRs(qryFltr *utils.CDRsFilter, remove bool) ([]*CDR, filters[CostLow] = bson.M{"$lt": *qryFltr.MaxCost} } } - //file.WriteString(fmt.Sprintf("AFTER: %v\n", utils.ToIJSON(filters))) - //file.Close() + // file.WriteString(fmt.Sprintf("AFTER: %v\n", utils.ToIJSON(filters))) + // file.Close() if remove { - var chgd int64 - err := ms.query(func(sctx mongo.SessionContext) (err error) { - dr, err := ms.getCol(ColCDRs).DeleteMany(sctx, filters) - if err != nil { - return err + err := ms.query(func(sctx mongo.SessionContext) (qryErr error) { + dr, qryErr := ms.getCol(ColCDRs).DeleteMany(sctx, filters) + if qryErr != nil { + return qryErr } - chgd = dr.DeletedCount - return err + n = dr.DeletedCount + return qryErr }) - return nil, chgd, err + return nil, n, err } fop := options.Find() cop := options.Count() @@ -1139,26 +1160,26 @@ func (ms *MongoStorage) GetCDRs(qryFltr *utils.CDRsFilter, remove bool) ([]*CDR, case utils.Cost: orderVal += "cost" default: - return nil, 0, fmt.Errorf("Invalid value : %s", separateVals[0]) + return nil, 0, fmt.Errorf("invalid value : %s", separateVals[0]) } fop = fop.SetSort(bson.M{orderVal: ordVal}) } if qryFltr.Count { var cnt int64 - if err := ms.query(func(sctx mongo.SessionContext) (err error) { - cnt, err = ms.getCol(ColCDRs).CountDocuments(sctx, filters, cop) - return err - }); err != nil { + err := ms.query(func(sctx mongo.SessionContext) (qryErr error) { + cnt, qryErr = ms.getCol(ColCDRs).CountDocuments(sctx, filters, cop) + return qryErr + }) + if err != nil { return nil, 0, err } return nil, cnt, nil } // Execute query - var cdrs []*CDR - err := ms.query(func(sctx mongo.SessionContext) (err error) { - cur, err := ms.getCol(ColCDRs).Find(sctx, filters, fop) - if err != nil { - return err + err = ms.query(func(sctx mongo.SessionContext) (qryErr error) { + cur, qryErr := ms.getCol(ColCDRs).Find(sctx, filters, fop) + if qryErr != nil { + return qryErr } for cur.Next(sctx) { cdr := CDR{} @@ -1532,37 +1553,40 @@ func (ms *MongoStorage) SetTPDispatcherHosts(tpDPPs []*utils.TPDispatcherHost) ( }) } -func (ms *MongoStorage) GetVersions(itm string) (vrs Versions, err error) { +func (ms *MongoStorage) GetVersions(itm string) (Versions, error) { fop := options.FindOne() if itm != "" { fop.SetProjection(bson.M{itm: 1, "_id": 0}) } else { fop.SetProjection(bson.M{"_id": 0}) } - if err = ms.query(func(sctx mongo.SessionContext) (err error) { - cur := ms.getCol(ColVer).FindOne(sctx, bson.D{}, fop) - if err := cur.Decode(&vrs); err != nil { - if err == mongo.ErrNoDocuments { - return utils.ErrNotFound - } - return err + var vrs Versions + err := ms.query(func(sctx mongo.SessionContext) (err error) { + sr := ms.getCol(ColVer).FindOne(sctx, bson.D{}, fop) + decodeErr := sr.Decode(&vrs) + if errors.Is(decodeErr, mongo.ErrNoDocuments) { + return utils.ErrNotFound } - return nil - }); err != nil { + return decodeErr + }) + if err != nil { return nil, err } if len(vrs) == 0 { return nil, utils.ErrNotFound } - return + return vrs, nil } -func (ms *MongoStorage) SetVersions(vrs Versions, overwrite bool) (err error) { +func (ms *MongoStorage) SetVersions(vrs Versions, overwrite bool) error { if overwrite { - ms.RemoveVersions(nil) + err := ms.RemoveVersions(nil) + if err != nil && !errors.Is(err, utils.ErrNotFound) { + return err + } } - return ms.query(func(sctx mongo.SessionContext) (err error) { - _, err = ms.getCol(ColVer).UpdateOne(sctx, bson.D{}, bson.M{"$set": vrs}, + return ms.query(func(sctx mongo.SessionContext) error { + _, err := ms.getCol(ColVer).UpdateOne(sctx, bson.D{}, bson.M{"$set": vrs}, options.Update().SetUpsert(true), ) return err @@ -1575,23 +1599,22 @@ func (ms *MongoStorage) SetVersions(vrs Versions, overwrite bool) (err error) { // _, err = col.Upsert(bson.M{}, bson.M{"$set": &vrs}) } -func (ms *MongoStorage) RemoveVersions(vrs Versions) (err error) { +func (ms *MongoStorage) RemoveVersions(vrs Versions) error { if len(vrs) == 0 { - return ms.query(func(sctx mongo.SessionContext) (err error) { - var dr *mongo.DeleteResult - dr, err = ms.getCol(ColVer).DeleteOne(sctx, bson.D{}) + return ms.query(func(sctx mongo.SessionContext) error { + dr, err := ms.getCol(ColVer).DeleteOne(sctx, bson.D{}) if err != nil { - return + return err } if dr.DeletedCount == 0 { return utils.ErrNotFound } - return + return nil }) } - return ms.query(func(sctx mongo.SessionContext) (err error) { + return ms.query(func(sctx mongo.SessionContext) error { for k := range vrs { - if _, err = ms.getCol(ColVer).UpdateOne(sctx, bson.D{}, bson.M{"$unset": bson.M{k: 1}}, + if _, err := ms.getCol(ColVer).UpdateOne(sctx, bson.D{}, bson.M{"$unset": bson.M{k: 1}}, options.Update().SetUpsert(true)); err != nil { return err } diff --git a/engine/storage_utils.go b/engine/storage_utils.go index f97b388f9..02f9cc508 100644 --- a/engine/storage_utils.go +++ b/engine/storage_utils.go @@ -20,6 +20,9 @@ package engine import ( "fmt" + "net" + "net/url" + "path" "strconv" "strings" "time" @@ -81,6 +84,24 @@ func NewStorDBConn(dbType, host, port, name, user, pass, marshaler string, return } +func buildURL(scheme, host, port, db, user, pass string) (*url.URL, error) { + u, err := url.Parse("//" + host) + if err != nil { + return nil, err + } + if port != "0" { + u.Host = net.JoinHostPort(u.Host, port) + } + if user != "" && pass != "" { + u.User = url.UserPassword(user, pass) + } + if db != "" { + u.Path = path.Join(u.Path, db) + } + u.Scheme = scheme + return u, nil +} + // SMCost stores one Cost coming from SM type SMCost struct { CGRID string diff --git a/go.mod b/go.mod index b128ce9e4..9a198b7a5 100644 --- a/go.mod +++ b/go.mod @@ -45,7 +45,7 @@ require ( github.com/peterh/liner v1.2.2 github.com/rabbitmq/amqp091-go v1.9.0 github.com/segmentio/kafka-go v0.4.44 - go.mongodb.org/mongo-driver v1.11.0 + go.mongodb.org/mongo-driver v1.12.1 golang.org/x/crypto v0.14.0 golang.org/x/net v0.17.0 golang.org/x/oauth2 v0.13.0 @@ -94,7 +94,6 @@ require ( github.com/jinzhu/now v1.1.5 // indirect github.com/jmespath/go-jmespath v0.4.0 // indirect github.com/klauspost/compress v1.17.1 // indirect - github.com/kr/pretty v0.2.1 // indirect github.com/lib/pq v1.8.0 // indirect github.com/mattn/go-runewidth v0.0.15 // indirect github.com/minio/highwayhash v1.0.2 // indirect @@ -105,7 +104,6 @@ require ( github.com/nats-io/nuid v1.0.1 // indirect github.com/philhofer/fwd v1.1.1 // indirect github.com/pierrec/lz4/v4 v4.1.18 // indirect - github.com/pkg/errors v0.9.1 // indirect github.com/rivo/uniseg v0.4.4 // indirect github.com/steveyen/gtreap v0.1.0 // indirect github.com/syndtr/goleveldb v1.0.0 // indirect diff --git a/go.sum b/go.sum index 91214e768..17d816eaf 100644 --- a/go.sum +++ b/go.sum @@ -253,11 +253,8 @@ github.com/kljensen/snowball v0.6.0/go.mod h1:27N7E8fVU5H68RlUmnWwZCfxgt4POBJfEN github.com/konsorten/go-windows-terminal-sequences v1.0.1/go.mod h1:T0+1ngSBFLxvqU3pZ+m/2kptfBszLMUkC4ZK/EgS/cQ= github.com/konsorten/go-windows-terminal-sequences v1.0.2/go.mod h1:T0+1ngSBFLxvqU3pZ+m/2kptfBszLMUkC4ZK/EgS/cQ= github.com/kr/pretty v0.1.0/go.mod h1:dAy3ld7l9f0ibDNOQOHHMYYIIbhfbHSm3C4ZsoJORNo= -github.com/kr/pretty v0.2.1 h1:Fmg33tUaq4/8ym9TJN1x7sLJnHVwhP33CNkpYV/7rwI= -github.com/kr/pretty v0.2.1/go.mod h1:ipq/a2n7PKx3OHsz4KJII5eveXtPO4qwEXGdVfWzfnI= github.com/kr/pty v1.1.1/go.mod h1:pFQYn66WHrOpPYNljwOMqo10TkYh1fy3cYio2l3bCsQ= github.com/kr/pty v1.1.8/go.mod h1:O1sed60cT9XZ5uDucP5qwvh+TE3NnUj51EiZO/lmSfw= -github.com/kr/text v0.1.0 h1:45sCR5RtlFHMR4UwH9sdQ5TC8v0qDQCHnXt+kaKSTVE= github.com/kr/text v0.1.0/go.mod h1:4Jbv+DJW3UT/LiOwJeYQe1efqtUx/iVham/4vfdArNI= github.com/lib/pq v1.0.0/go.mod h1:5WUZQaWbwv1U+lTReE5YruASi9Al49XbQIvNi/34Woo= github.com/lib/pq v1.1.0/go.mod h1:5WUZQaWbwv1U+lTReE5YruASi9Al49XbQIvNi/34Woo= @@ -366,7 +363,6 @@ github.com/stretchr/testify v1.2.2/go.mod h1:a8OnRcib4nhh0OaRAV+Yts87kKdq0PP7pXf github.com/stretchr/testify v1.3.0/go.mod h1:M5WIy9Dh21IEIfnGCwXGc5bZfKNJtfHm1UVUgZn+9EI= github.com/stretchr/testify v1.4.0/go.mod h1:j7eGeouHqKxXV5pUuKE4zz7dFj8WfuZ+81PSLYec5m4= github.com/stretchr/testify v1.5.1/go.mod h1:5W2xD1RspED5o8YsWQXVCued0rvSQ+mT+I5cxcmMvtA= -github.com/stretchr/testify v1.6.1/go.mod h1:6Fq8oRcR53rry900zMqJjRRixrwX3KX962/h/Wwjteg= github.com/stretchr/testify v1.7.1/go.mod h1:6Fq8oRcR53rry900zMqJjRRixrwX3KX962/h/Wwjteg= github.com/stretchr/testify v1.8.0/go.mod h1:yNjHg4UonilssWZ8iaSj1OCr/vHnekPRkoO+kdMU+MU= github.com/stretchr/testify v1.8.1/go.mod h1:w2LPCIKwWwSfY2zedu0+kehJoqGctiVI29o6fzry7u4= @@ -377,8 +373,6 @@ github.com/syndtr/goleveldb v1.0.0/go.mod h1:ZVVdQEZoIme9iO1Ch2Jdy24qqXrMMOU6lpP github.com/tebeka/snowball v0.4.2/go.mod h1:4IfL14h1lvwZcp1sfXuuc7/7yCsvVffTWxWxCLfFpYg= github.com/tecbot/gorocksdb v0.0.0-20191217155057-f0fad39f321c h1:g+WoO5jjkqGAzHWCjJB1zZfXPIAaDpzXIEJ0eS6B5Ok= github.com/tecbot/gorocksdb v0.0.0-20191217155057-f0fad39f321c/go.mod h1:ahpPrc7HpcfEWDQRZEmnXMzHY03mLDYMCxeDzy46i+8= -github.com/tidwall/pretty v1.0.0 h1:HsD+QiTn7sK6flMKIvNmpqz1qrpP3Ps6jOKIKMooyg4= -github.com/tidwall/pretty v1.0.0/go.mod h1:XNkn88O1ChpSDQmQeStsy+sBenx6DDtFZJxhVysOjyk= github.com/tinylib/msgp v1.1.0/go.mod h1:+d+yLhGm8mzTaHzB+wgMYrodPfmZrzkirds8fDWklFE= github.com/tinylib/msgp v1.1.2/go.mod h1:+d+yLhGm8mzTaHzB+wgMYrodPfmZrzkirds8fDWklFE= github.com/tinylib/msgp v1.1.5 h1:2gXmtWueD2HefZHQe1QOy9HVzmFrLOVvsXwXBQ0ayy0= @@ -390,10 +384,8 @@ github.com/willf/bitset v1.1.11 h1:N7Z7E9UvjW+sGsEl7k/SJrvY2reP1A07MrGuCjIOjRE= github.com/willf/bitset v1.1.11/go.mod h1:83CECat5yLh5zVOf4P1ErAgKA5UDvKtgyUABdr3+MjI= github.com/xdg-go/pbkdf2 v1.0.0 h1:Su7DPu48wXMwC3bs7MCNG+z4FhcyEuz5dlvchbq0B0c= github.com/xdg-go/pbkdf2 v1.0.0/go.mod h1:jrpuAogTd400dnrH08LKmI/xc1MbPOebTwRqcT5RDeI= -github.com/xdg-go/scram v1.1.1/go.mod h1:RaEWvsqvNKKvBPvcKeFjrG2cJqOkHTiyTpzz23ni57g= github.com/xdg-go/scram v1.1.2 h1:FHX5I5B4i4hKRVRBCFRxq1iQRej7WO3hhBuJf+UUySY= github.com/xdg-go/scram v1.1.2/go.mod h1:RT/sEzTbU5y00aCK8UOx6R7YryM0iF1N2MOmC3kKLN4= -github.com/xdg-go/stringprep v1.0.3/go.mod h1:W3f5j4i+9rC0kuIEJL0ky1VpHXQU3ocBgklLGvcBnW8= github.com/xdg-go/stringprep v1.0.4 h1:XLI/Ng3O1Atzq0oBs3TWm+5ZVgkq2aqdlvP9JtoZ6c8= github.com/xdg-go/stringprep v1.0.4/go.mod h1:mPGuuIYwz7CmR2bT9j4GbQqutWS1zV24gijq1dTyGkM= github.com/xordataexchange/crypt v0.0.3-0.20170626215501-b2862e3d0a77/go.mod h1:aYKd//L2LvnjZzWKhF00oedf4jCCReLcmhLdhm1A27Q= @@ -404,8 +396,8 @@ github.com/yuin/goldmark v1.4.13/go.mod h1:6yULJ656Px+3vBD8DxQVa3kxgyrAnzto9xy5t github.com/zenazn/goji v0.9.0/go.mod h1:7S9M489iMyHBNxwZnk9/EHS098H4/F6TATF2mIxtB1Q= go.etcd.io/bbolt v1.3.5 h1:XAzx9gjCb0Rxj7EoqcClPD1d5ZBxZJk0jbuoPHenBt0= go.etcd.io/bbolt v1.3.5/go.mod h1:G5EMThwa9y8QZGBClrRx5EY+Yw9kAhnjy3bSjsnlVTQ= -go.mongodb.org/mongo-driver v1.11.0 h1:FZKhBSTydeuffHj9CBjXlR8vQLee1cQyTWYPA6/tqiE= -go.mongodb.org/mongo-driver v1.11.0/go.mod h1:s7p5vEtfbeR1gYi6pnj3c3/urpbLv2T5Sfd6Rp2HBB8= +go.mongodb.org/mongo-driver v1.12.1 h1:nLkghSU8fQNaK7oUmDhQFsnrtcoNy7Z6LVFKsEecqgE= +go.mongodb.org/mongo-driver v1.12.1/go.mod h1:/rGBTebI3XYboVmgz+Wv3Bcbl3aD0QF9zl6kDDw18rQ= go.opencensus.io v0.24.0 h1:y73uSU6J157QMP2kn2r30vwW1A2W2WFwSCGnAVxeaD0= go.opencensus.io v0.24.0/go.mod h1:vNK8G9p7aAivkbmorf4v+7Hgx+Zs0yY+0fOtgBfjQKo= go.uber.org/atomic v1.3.2/go.mod h1:gD2HeocX3+yG+ygLZcrzQJaqmWj9AIm7n08wl/qW/PE= @@ -476,7 +468,6 @@ golang.org/x/sync v0.0.0-20181108010431-42b317875d0f/go.mod h1:RxMgew5VJxzue5/jJ golang.org/x/sync v0.0.0-20190423024810-112230192c58/go.mod h1:RxMgew5VJxzue5/jJTE5uejpjVlOe/izrB70Jof72aM= golang.org/x/sync v0.0.0-20200625203802-6e8e738ad208/go.mod h1:RxMgew5VJxzue5/jJTE5uejpjVlOe/izrB70Jof72aM= golang.org/x/sync v0.0.0-20201020160332-67f06af15bc9/go.mod h1:RxMgew5VJxzue5/jJTE5uejpjVlOe/izrB70Jof72aM= -golang.org/x/sync v0.0.0-20210220032951-036812b2e83c/go.mod h1:RxMgew5VJxzue5/jJTE5uejpjVlOe/izrB70Jof72aM= golang.org/x/sync v0.0.0-20220722155255-886fb9371eb4/go.mod h1:RxMgew5VJxzue5/jJTE5uejpjVlOe/izrB70Jof72aM= golang.org/x/sync v0.1.0/go.mod h1:RxMgew5VJxzue5/jJTE5uejpjVlOe/izrB70Jof72aM= golang.org/x/sync v0.4.0 h1:zxkM55ReGkDlKSM+Fu41A+zmbZuaPVbGMzvvdUPznYQ=