/* Real-time Online/Offline Charging System (OCS) for Telecom & ISP environments Copyright (C) ITsysCOM GmbH This program is free software: you can redistribute it and/or modify it under the terms of the GNU General Public License as published by the Free Software Foundation, either version 3 of the License, or (at your option) any later version. This program is distributed in the hope that it will be useful, but WITHOUT ANY WARRANTY; without even the implied warranty of MERCHANTABILITY or FITNESS FOR A PARTICULAR PURPOSE. See the GNU General Public License for more details. You should have received a copy of the GNU General Public License along with this program. If not, see */ package engine import ( "regexp" "strings" "time" "github.com/cgrates/birpc/context" "github.com/cgrates/cgrates/utils" "go.mongodb.org/mongo-driver/bson" "go.mongodb.org/mongo-driver/mongo" "go.mongodb.org/mongo-driver/mongo/options" "go.mongodb.org/mongo-driver/x/bsonx" ) func (ms *MongoStorage) GetTpIds(colName string) (tpids []string, err error) { getTpIDs := func(ctx mongo.SessionContext, col string, tpMap utils.StringSet) (utils.StringSet, error) { if strings.HasPrefix(col, "tp_") { result, err := ms.getCol(col).Distinct(ctx, "tpid", bson.D{}) if err != nil { return tpMap, err } for _, tpid := range result { tpMap.Add(tpid.(string)) } } return tpMap, nil } tpidMap := make(utils.StringSet) if colName == "" { if err := ms.query(context.TODO(), func(sctx mongo.SessionContext) error { col, err := ms.DB().ListCollections(sctx, bson.D{}, options.ListCollections().SetNameOnly(true)) if err != nil { return err } for col.Next(sctx) { var elem struct{ Name string } if err := col.Decode(&elem); err != nil { return err } if tpidMap, err = getTpIDs(sctx, elem.Name, tpidMap); err != nil { return err } } return col.Close(sctx) }); err != nil { return nil, err } } else { if err := ms.query(context.TODO(), func(sctx mongo.SessionContext) error { tpidMap, err = getTpIDs(sctx, colName, tpidMap) return err }); err != nil { return nil, err } } tpids = tpidMap.AsSlice() return tpids, nil } func (ms *MongoStorage) GetTpTableIds(tpid, table string, distinct []string, filter map[string]string, pag *utils.PaginatorWithSearch) ([]string, error) { findMap := bson.M{} if tpid != "" { findMap["tpid"] = tpid } for k, v := range filter { if k != "" && v != "" { findMap[k] = v } } fop := options.Find() if pag != nil { if pag.Search != "" { var searchItems []bson.M for _, d := range distinct { searchItems = append(searchItems, bson.M{d: bsonx.Regex(".*"+regexp.QuoteMeta(pag.Search)+".*", "")}) } // findMap["$and"] = []bson.M{{"$or": searchItems}} //before findMap["$or"] = searchItems // after } if pag.Paginator != nil { if pag.Limit != nil { fop = fop.SetLimit(int64(*pag.Limit)) } if pag.Offset != nil { fop = fop.SetSkip(int64(*pag.Offset)) } } } selectors := bson.M{"_id": 0} for i, d := range distinct { if d == "tag" { // convert the tag used in SQL into id used here distinct[i] = "id" } selectors[distinct[i]] = 1 } fop.SetProjection(selectors) distinctIds := make(utils.StringSet) if err := ms.query(context.TODO(), func(sctx mongo.SessionContext) (err error) { cur, err := ms.getCol(table).Find(sctx, findMap, fop) if err != nil { return err } for cur.Next(sctx) { var elem bson.D err := cur.Decode(&elem) if err != nil { return err } item := elem.Map() var id string last := len(distinct) - 1 for i, d := range distinct { if distinctValue, ok := item[d]; ok { id += distinctValue.(string) } if i < last { id += utils.ConcatenatedKeySep } } distinctIds.Add(id) } return cur.Close(sctx) }); err != nil { return nil, err } return distinctIds.AsSlice(), nil } func (ms *MongoStorage) GetTPResources(tpid, tenant, id string) ([]*utils.TPResourceProfile, error) { filter := bson.M{"tpid": tpid} if id != "" { filter["id"] = id } if tenant != "" { filter["tenant"] = tenant } var results []*utils.TPResourceProfile err := ms.query(context.TODO(), func(sctx mongo.SessionContext) (err error) { cur, err := ms.getCol(utils.TBLTPResources).Find(sctx, filter) if err != nil { return err } for cur.Next(sctx) { var el utils.TPResourceProfile err := cur.Decode(&el) if err != nil { return err } results = append(results, &el) } if len(results) == 0 { return utils.ErrNotFound } return cur.Close(sctx) }) return results, err } func (ms *MongoStorage) GetTPStats(tpid, tenant, id string) ([]*utils.TPStatProfile, error) { filter := bson.M{ "tpid": tpid, } if id != "" { filter["id"] = id } if tenant != "" { filter["tenant"] = tenant } var results []*utils.TPStatProfile err := ms.query(context.TODO(), func(sctx mongo.SessionContext) (err error) { cur, err := ms.getCol(utils.TBLTPStats).Find(sctx, filter) if err != nil { return err } for cur.Next(sctx) { var el utils.TPStatProfile err := cur.Decode(&el) if err != nil { return err } results = append(results, &el) } if len(results) == 0 { return utils.ErrNotFound } return cur.Close(sctx) }) return results, err } func (ms *MongoStorage) RemTpData(table, tpid string, args map[string]string) error { if len(table) == 0 { // Remove tpid out of all tables return ms.query(context.TODO(), func(sctx mongo.SessionContext) error { col, err := ms.DB().ListCollections(sctx, bson.D{}, options.ListCollections().SetNameOnly(true)) if err != nil { return err } for col.Next(sctx) { var elem struct{ Name string } if err := col.Decode(&elem); err != nil { return err } if strings.HasPrefix(elem.Name, "tp_") { _, err = ms.getCol(elem.Name).DeleteMany(sctx, bson.M{"tpid": tpid}) if err != nil { return err } } } return col.Close(sctx) }) } // Remove from a single table if args == nil { args = make(map[string]string) } if _, has := args["tag"]; has { // API uses tag to be compatible with SQL models, fix it here args["id"] = args["tag"] delete(args, "tag") } if tpid != "" { args["tpid"] = tpid } return ms.query(context.TODO(), func(sctx mongo.SessionContext) (err error) { dr, err := ms.getCol(table).DeleteOne(sctx, args) if dr.DeletedCount == 0 { return utils.ErrNotFound } return err }) } func (ms *MongoStorage) SetTPResources(tpRLs []*utils.TPResourceProfile) (err error) { if len(tpRLs) == 0 { return } return ms.query(context.TODO(), func(sctx mongo.SessionContext) (err error) { for _, tp := range tpRLs { _, err = ms.getCol(utils.TBLTPResources).UpdateOne(sctx, bson.M{"tpid": tp.TPid, "id": tp.ID}, bson.M{"$set": tp}, options.Update().SetUpsert(true)) if err != nil { return err } } return nil }) } func (ms *MongoStorage) SetTPRStats(tps []*utils.TPStatProfile) (err error) { if len(tps) == 0 { return } return ms.query(context.TODO(), func(sctx mongo.SessionContext) (err error) { for _, tp := range tps { _, err = ms.getCol(utils.TBLTPStats).UpdateOne(sctx, bson.M{"tpid": tp.TPid, "id": tp.ID}, bson.M{"$set": tp}, options.Update().SetUpsert(true)) if err != nil { return err } } return nil }) } func (ms *MongoStorage) cleanEmptyFilters(filters bson.M) { for k, v := range filters { switch value := v.(type) { case *int64: if value == nil { delete(filters, k) } case *float64: if value == nil { delete(filters, k) } case *time.Time: if value == nil { delete(filters, k) } case *time.Duration: if value == nil { delete(filters, k) } case []string: if len(value) == 0 { delete(filters, k) } case bson.M: ms.cleanEmptyFilters(value) if len(value) == 0 { delete(filters, k) } } } } func (ms *MongoStorage) SetTPStats(tpSTs []*utils.TPStatProfile) (err error) { if len(tpSTs) == 0 { return } return ms.query(context.TODO(), func(sctx mongo.SessionContext) (err error) { for _, tp := range tpSTs { _, err = ms.getCol(utils.TBLTPStats).UpdateOne(sctx, bson.M{"tpid": tp.TPid, "id": tp.ID}, bson.M{"$set": tp}, options.Update().SetUpsert(true), ) if err != nil { return err } } return nil }) } func (ms *MongoStorage) GetTPThresholds(tpid, tenant, id string) ([]*utils.TPThresholdProfile, error) { filter := bson.M{"tpid": tpid} if id != "" { filter["id"] = id } if tenant != "" { filter["tenant"] = tenant } var results []*utils.TPThresholdProfile err := ms.query(context.TODO(), func(sctx mongo.SessionContext) (err error) { cur, err := ms.getCol(utils.TBLTPThresholds).Find(sctx, filter) if err != nil { return err } for cur.Next(sctx) { var tp utils.TPThresholdProfile err := cur.Decode(&tp) if err != nil { return err } results = append(results, &tp) } if len(results) == 0 { return utils.ErrNotFound } return cur.Close(sctx) }) return results, err } func (ms *MongoStorage) SetTPThresholds(tpTHs []*utils.TPThresholdProfile) (err error) { if len(tpTHs) == 0 { return } return ms.query(context.TODO(), func(sctx mongo.SessionContext) (err error) { for _, tp := range tpTHs { _, err = ms.getCol(utils.TBLTPThresholds).UpdateOne(sctx, bson.M{"tpid": tp.TPid, "id": tp.ID}, bson.M{"$set": tp}, options.Update().SetUpsert(true), ) if err != nil { return err } } return nil }) } func (ms *MongoStorage) GetTPFilters(tpid, tenant, id string) ([]*utils.TPFilterProfile, error) { filter := bson.M{"tpid": tpid} if id != "" { filter["id"] = id } if tenant != "" { filter["tenant"] = tenant } results := []*utils.TPFilterProfile{} err := ms.query(context.TODO(), func(sctx mongo.SessionContext) (err error) { cur, err := ms.getCol(utils.TBLTPFilters).Find(sctx, filter) if err != nil { return err } for cur.Next(sctx) { var tp utils.TPFilterProfile err := cur.Decode(&tp) if err != nil { return err } results = append(results, &tp) } if len(results) == 0 { return utils.ErrNotFound } return cur.Close(sctx) }) return results, err } func (ms *MongoStorage) SetTPFilters(tpTHs []*utils.TPFilterProfile) (err error) { if len(tpTHs) == 0 { return } return ms.query(context.TODO(), func(sctx mongo.SessionContext) (err error) { for _, tp := range tpTHs { _, err = ms.getCol(utils.TBLTPFilters).UpdateOne(sctx, bson.M{"tpid": tp.TPid, "id": tp.ID}, bson.M{"$set": tp}, options.Update().SetUpsert(true), ) if err != nil { return err } } return nil }) } func (ms *MongoStorage) GetTPRoutes(tpid, tenant, id string) ([]*utils.TPRouteProfile, error) { filter := bson.M{"tpid": tpid} if id != "" { filter["id"] = id } if tenant != "" { filter["tenant"] = tenant } var results []*utils.TPRouteProfile err := ms.query(context.TODO(), func(sctx mongo.SessionContext) (err error) { cur, err := ms.getCol(utils.TBLTPRoutes).Find(sctx, filter) if err != nil { return err } for cur.Next(sctx) { var tp utils.TPRouteProfile err := cur.Decode(&tp) if err != nil { return err } results = append(results, &tp) } if len(results) == 0 { return utils.ErrNotFound } return cur.Close(sctx) }) return results, err } func (ms *MongoStorage) SetTPRoutes(tpRoutes []*utils.TPRouteProfile) (err error) { if len(tpRoutes) == 0 { return } return ms.query(context.TODO(), func(sctx mongo.SessionContext) (err error) { for _, tp := range tpRoutes { _, err = ms.getCol(utils.TBLTPRoutes).UpdateOne(sctx, bson.M{"tpid": tp.TPid, "id": tp.ID}, bson.M{"$set": tp}, options.Update().SetUpsert(true), ) if err != nil { return err } } return nil }) } func (ms *MongoStorage) GetTPAttributes(tpid, tenant, id string) ([]*utils.TPAttributeProfile, error) { filter := bson.M{"tpid": tpid} if id != "" { filter["id"] = id } if tenant != "" { filter["tenant"] = tenant } var results []*utils.TPAttributeProfile err := ms.query(context.TODO(), func(sctx mongo.SessionContext) (err error) { cur, err := ms.getCol(utils.TBLTPAttributes).Find(sctx, filter) if err != nil { return err } for cur.Next(sctx) { var tp utils.TPAttributeProfile err := cur.Decode(&tp) if err != nil { return err } results = append(results, &tp) } if len(results) == 0 { return utils.ErrNotFound } return cur.Close(sctx) }) return results, err } func (ms *MongoStorage) SetTPAttributes(tpRoutes []*utils.TPAttributeProfile) (err error) { if len(tpRoutes) == 0 { return } return ms.query(context.TODO(), func(sctx mongo.SessionContext) (err error) { for _, tp := range tpRoutes { _, err = ms.getCol(utils.TBLTPAttributes).UpdateOne(sctx, bson.M{"tpid": tp.TPid, "id": tp.ID}, bson.M{"$set": tp}, options.Update().SetUpsert(true), ) if err != nil { return err } } return nil }) } func (ms *MongoStorage) GetTPChargers(tpid, tenant, id string) ([]*utils.TPChargerProfile, error) { filter := bson.M{"tpid": tpid} if id != "" { filter["id"] = id } if tenant != "" { filter["tenant"] = tenant } var results []*utils.TPChargerProfile err := ms.query(context.TODO(), func(sctx mongo.SessionContext) (err error) { cur, err := ms.getCol(utils.TBLTPChargers).Find(sctx, filter) if err != nil { return err } for cur.Next(sctx) { var tp utils.TPChargerProfile err := cur.Decode(&tp) if err != nil { return err } results = append(results, &tp) } if len(results) == 0 { return utils.ErrNotFound } return cur.Close(sctx) }) return results, err } func (ms *MongoStorage) SetTPChargers(tpCPP []*utils.TPChargerProfile) (err error) { if len(tpCPP) == 0 { return } return ms.query(context.TODO(), func(sctx mongo.SessionContext) (err error) { for _, tp := range tpCPP { _, err = ms.getCol(utils.TBLTPChargers).UpdateOne(sctx, bson.M{"tpid": tp.TPid, "id": tp.ID}, bson.M{"$set": tp}, options.Update().SetUpsert(true), ) if err != nil { return err } } return nil }) } func (ms *MongoStorage) GetTPDispatcherProfiles(tpid, tenant, id string) ([]*utils.TPDispatcherProfile, error) { filter := bson.M{"tpid": tpid} if id != "" { filter["id"] = id } if tenant != "" { filter["tenant"] = tenant } var results []*utils.TPDispatcherProfile err := ms.query(context.TODO(), func(sctx mongo.SessionContext) (err error) { cur, err := ms.getCol(utils.TBLTPDispatchers).Find(sctx, filter) if err != nil { return err } for cur.Next(sctx) { var tp utils.TPDispatcherProfile err := cur.Decode(&tp) if err != nil { return err } results = append(results, &tp) } if len(results) == 0 { return utils.ErrNotFound } return cur.Close(sctx) }) return results, err } func (ms *MongoStorage) SetTPDispatcherProfiles(tpDPPs []*utils.TPDispatcherProfile) (err error) { if len(tpDPPs) == 0 { return } return ms.query(context.TODO(), func(sctx mongo.SessionContext) (err error) { for _, tp := range tpDPPs { _, err = ms.getCol(utils.TBLTPDispatchers).UpdateOne(sctx, bson.M{"tpid": tp.TPid, "id": tp.ID}, bson.M{"$set": tp}, options.Update().SetUpsert(true), ) if err != nil { return err } } return nil }) } func (ms *MongoStorage) GetTPDispatcherHosts(tpid, tenant, id string) ([]*utils.TPDispatcherHost, error) { filter := bson.M{"tpid": tpid} if id != "" { filter["id"] = id } if tenant != "" { filter["tenant"] = tenant } var results []*utils.TPDispatcherHost err := ms.query(context.TODO(), func(sctx mongo.SessionContext) (err error) { cur, err := ms.getCol(utils.TBLTPDispatcherHosts).Find(sctx, filter) if err != nil { return err } for cur.Next(sctx) { var tp utils.TPDispatcherHost err := cur.Decode(&tp) if err != nil { return err } results = append(results, &tp) } if len(results) == 0 { return utils.ErrNotFound } return cur.Close(sctx) }) return results, err } func (ms *MongoStorage) SetTPDispatcherHosts(tpDPPs []*utils.TPDispatcherHost) (err error) { if len(tpDPPs) == 0 { return } return ms.query(context.TODO(), func(sctx mongo.SessionContext) (err error) { for _, tp := range tpDPPs { _, err = ms.getCol(utils.TBLTPDispatcherHosts).UpdateOne(sctx, bson.M{"tpid": tp.TPid, "id": tp.ID}, bson.M{"$set": tp}, options.Update().SetUpsert(true), ) if err != nil { return err } } return nil }) } func (ms *MongoStorage) GetTPRateProfiles(tpid, tenant, id string) ([]*utils.TPRateProfile, error) { filter := bson.M{"tpid": tpid} if id != "" { filter["id"] = id } if tenant != "" { filter["tenant"] = tenant } var results []*utils.TPRateProfile err := ms.query(context.TODO(), func(sctx mongo.SessionContext) (err error) { cur, err := ms.getCol(utils.TBLTPRateProfiles).Find(sctx, filter) if err != nil { return err } for cur.Next(sctx) { var tp utils.TPRateProfile err := cur.Decode(&tp) if err != nil { return err } results = append(results, &tp) } if len(results) == 0 { return utils.ErrNotFound } return cur.Close(sctx) }) return results, err } func (ms *MongoStorage) SetTPRateProfiles(tpDPPs []*utils.TPRateProfile) (err error) { if len(tpDPPs) == 0 { return } return ms.query(context.TODO(), func(sctx mongo.SessionContext) (err error) { for _, tp := range tpDPPs { _, err = ms.getCol(utils.TBLTPRateProfiles).UpdateOne(sctx, bson.M{"tpid": tp.TPid, "id": tp.ID}, bson.M{"$set": tp}, options.Update().SetUpsert(true), ) if err != nil { return err } } return nil }) } func (ms *MongoStorage) GetTPActionProfiles(tpid, tenant, id string) ([]*utils.TPActionProfile, error) { filter := bson.M{"tpid": tpid} if id != "" { filter["id"] = id } if tenant != "" { filter["tenant"] = tenant } var results []*utils.TPActionProfile err := ms.query(context.TODO(), func(sctx mongo.SessionContext) (err error) { cur, err := ms.getCol(utils.TBLTPActionProfiles).Find(sctx, filter) if err != nil { return err } for cur.Next(sctx) { var tp utils.TPActionProfile err := cur.Decode(&tp) if err != nil { return err } results = append(results, &tp) } if len(results) == 0 { return utils.ErrNotFound } return cur.Close(sctx) }) return results, err } func (ms *MongoStorage) GetTPAccounts(tpid, tenant, id string) ([]*utils.TPAccount, error) { filter := bson.M{"tpid": tpid} if id != "" { filter["id"] = id } if tenant != "" { filter["tenant"] = tenant } var results []*utils.TPAccount err := ms.query(context.TODO(), func(sctx mongo.SessionContext) (err error) { cur, err := ms.getCol(utils.TBLTPAccounts).Find(sctx, filter) if err != nil { return err } for cur.Next(sctx) { var tp utils.TPAccount err := cur.Decode(&tp) if err != nil { return err } results = append(results, &tp) } if len(results) == 0 { return utils.ErrNotFound } return cur.Close(sctx) }) return results, err } func (ms *MongoStorage) SetTPActionProfiles(tpAps []*utils.TPActionProfile) (err error) { if len(tpAps) == 0 { return } return ms.query(context.TODO(), func(sctx mongo.SessionContext) (err error) { for _, tp := range tpAps { _, err = ms.getCol(utils.TBLTPActionProfiles).UpdateOne(sctx, bson.M{"tpid": tp.TPid, "id": tp.ID}, bson.M{"$set": tp}, options.Update().SetUpsert(true), ) if err != nil { return err } } return nil }) } func (ms *MongoStorage) SetTPAccounts(tpAps []*utils.TPAccount) (err error) { if len(tpAps) == 0 { return } return ms.query(context.TODO(), func(sctx mongo.SessionContext) (err error) { for _, tp := range tpAps { _, err = ms.getCol(utils.TBLTPAccounts).UpdateOne(sctx, bson.M{"tpid": tp.TPid, "id": tp.ID}, bson.M{"$set": tp}, options.Update().SetUpsert(true), ) if err != nil { return err } } return nil }) } func (ms *MongoStorage) GetVersions(itm string) (vrs Versions, err error) { fop := options.FindOne() if itm != "" { fop.SetProjection(bson.M{itm: 1, "_id": 0}) } else { fop.SetProjection(bson.M{"_id": 0}) } if err = ms.query(context.TODO(), func(sctx mongo.SessionContext) (err error) { cur := ms.getCol(ColVer).FindOne(sctx, bson.D{}, fop) if err := cur.Decode(&vrs); err != nil { if err == mongo.ErrNoDocuments { return utils.ErrNotFound } return err } return nil }); err != nil { return nil, err } if len(vrs) == 0 { return nil, utils.ErrNotFound } return } func (ms *MongoStorage) SetVersions(vrs Versions, overwrite bool) (err error) { if overwrite { ms.RemoveVersions(nil) } return ms.query(context.TODO(), func(sctx mongo.SessionContext) (err error) { _, err = ms.getCol(ColVer).UpdateOne(sctx, bson.D{}, bson.M{"$set": vrs}, options.Update().SetUpsert(true), ) return err }) // } // return ms.query( func(sctx mongo.SessionContext) error { // _, err := ms.getCol(ColVer).InsertOne(sctx, vrs) // return err // }) // _, err = col.Upsert(bson.M{}, bson.M{"$set": &vrs}) } func (ms *MongoStorage) RemoveVersions(vrs Versions) (err error) { if len(vrs) == 0 { return ms.query(context.TODO(), func(sctx mongo.SessionContext) (err error) { var dr *mongo.DeleteResult dr, err = ms.getCol(ColVer).DeleteOne(sctx, bson.D{}) if err != nil { return } if dr.DeletedCount == 0 { return utils.ErrNotFound } return }) } return ms.query(context.TODO(), func(sctx mongo.SessionContext) (err error) { for k := range vrs { if _, err = ms.getCol(ColVer).UpdateOne(sctx, bson.D{}, bson.M{"$unset": bson.M{k: 1}}, options.Update().SetUpsert(true)); err != nil { return err } } return nil }) } func (ms *MongoStorage) GetStorageType() string { return utils.Mongo }