diff --git a/engine/storage_mongo_stordb.go b/engine/storage_mongo_stordb.go index a84a20a64..9c5ba2974 100644 --- a/engine/storage_mongo_stordb.go +++ b/engine/storage_mongo_stordb.go @@ -328,6 +328,80 @@ func (ms *MongoStorage) cleanEmptyFilters(filters bson.M) { } } -func (ms *MongoStorage) RemoveCDRs(_ *context.Context, qryFltr []*Filter) (err error) { - return utils.ErrNotImplemented +// RemoveCDRs removes CDRs from MongoDB based on provided query filters. +func (ms *MongoStorage) RemoveCDRs(ctx *context.Context, qryFltr []*Filter) (err error) { + var excludedCdrQueryFilterTypes []*FilterRule + filters := make(bson.M) + + // Build MongoDB filters based on the query filters provided. + for _, fltr := range qryFltr { + for _, rule := range fltr.Rules { + + // Check if the rule type is supported for direct database querying. + if !cdrQueryFilterTypes.Has(rule.Type) || checkNestedFields(rule.Element, rule.Values) { + excludedCdrQueryFilterTypes = append(excludedCdrQueryFilterTypes, rule) + continue + } + + // Determine the field to be filtered in MongoDB. + var elem string + if strings.HasPrefix(rule.Element, utils.DynamicDataPrefix+utils.MetaReq) { + elem = "event." + strings.TrimPrefix(rule.Element, utils.DynamicDataPrefix+utils.MetaReq+".") + } else { + elem = "opts." + strings.TrimPrefix(rule.Element, utils.DynamicDataPrefix+utils.MetaOpts+".") + } + + // Build a MongoDB filter for the element. + filters[elem] = ms.valueQry(filters, elem, rule.Type, rule.Values, strings.HasPrefix(rule.Type, utils.MetaNot)) + } + } + ms.cleanEmptyFilters(filters) + + // If there are no excluded filter types, delete all matching documents. + if len(excludedCdrQueryFilterTypes) == 0 { + return ms.query(ctx, func(sctx mongo.SessionContext) error { + _, err := ms.getCol(ColCDRs).DeleteMany(sctx, filters) + return err + }) + } + + // Process the filters that cannot be directly queried in the database. + err = ms.query(ctx, func(sctx mongo.SessionContext) error { + cur, err := ms.getCol(ColCDRs).Find(sctx, filters) + if err != nil { + return err + } + defer cur.Close(sctx) + for cur.Next(sctx) { + cdr := CDR{} + if err := cur.Decode(&cdr); err != nil { + return err + } + var pass bool + dP := cdr.CGREvent().AsDataProvider() + + // Check the excluded filters against the CDR. + for _, fltr := range excludedCdrQueryFilterTypes { + pass, err = fltr.Pass(ctx, dP) + if err != nil { + return err + } + if !pass { + break + } + } + + // If the CDR passes the filters, remove it. + if pass { + _, err := ms.getCol(ColCDRs).DeleteOne(sctx, bson.M{ + "opts.*cdrID": utils.IfaceAsString(cdr.Opts[utils.MetaCDRID]), + }) + if err != nil { + return err + } + } + } + return cur.Err() + }) + return err }