Implement RemoveCDRs for MongoStorage (untested and not optimized)

This commit is contained in:
ionutboangiu
2023-12-07 12:44:12 -05:00
committed by Dan Christian Bogos
parent fb6b38850c
commit ab26ba2d02

View File

@@ -328,6 +328,80 @@ func (ms *MongoStorage) cleanEmptyFilters(filters bson.M) {
}
}
func (ms *MongoStorage) RemoveCDRs(_ *context.Context, qryFltr []*Filter) (err error) {
return utils.ErrNotImplemented
// RemoveCDRs removes CDRs from MongoDB based on provided query filters.
func (ms *MongoStorage) RemoveCDRs(ctx *context.Context, qryFltr []*Filter) (err error) {
var excludedCdrQueryFilterTypes []*FilterRule
filters := make(bson.M)
// Build MongoDB filters based on the query filters provided.
for _, fltr := range qryFltr {
for _, rule := range fltr.Rules {
// Check if the rule type is supported for direct database querying.
if !cdrQueryFilterTypes.Has(rule.Type) || checkNestedFields(rule.Element, rule.Values) {
excludedCdrQueryFilterTypes = append(excludedCdrQueryFilterTypes, rule)
continue
}
// Determine the field to be filtered in MongoDB.
var elem string
if strings.HasPrefix(rule.Element, utils.DynamicDataPrefix+utils.MetaReq) {
elem = "event." + strings.TrimPrefix(rule.Element, utils.DynamicDataPrefix+utils.MetaReq+".")
} else {
elem = "opts." + strings.TrimPrefix(rule.Element, utils.DynamicDataPrefix+utils.MetaOpts+".")
}
// Build a MongoDB filter for the element.
filters[elem] = ms.valueQry(filters, elem, rule.Type, rule.Values, strings.HasPrefix(rule.Type, utils.MetaNot))
}
}
ms.cleanEmptyFilters(filters)
// If there are no excluded filter types, delete all matching documents.
if len(excludedCdrQueryFilterTypes) == 0 {
return ms.query(ctx, func(sctx mongo.SessionContext) error {
_, err := ms.getCol(ColCDRs).DeleteMany(sctx, filters)
return err
})
}
// Process the filters that cannot be directly queried in the database.
err = ms.query(ctx, func(sctx mongo.SessionContext) error {
cur, err := ms.getCol(ColCDRs).Find(sctx, filters)
if err != nil {
return err
}
defer cur.Close(sctx)
for cur.Next(sctx) {
cdr := CDR{}
if err := cur.Decode(&cdr); err != nil {
return err
}
var pass bool
dP := cdr.CGREvent().AsDataProvider()
// Check the excluded filters against the CDR.
for _, fltr := range excludedCdrQueryFilterTypes {
pass, err = fltr.Pass(ctx, dP)
if err != nil {
return err
}
if !pass {
break
}
}
// If the CDR passes the filters, remove it.
if pass {
_, err := ms.getCol(ColCDRs).DeleteOne(sctx, bson.M{
"opts.*cdrID": utils.IfaceAsString(cdr.Opts[utils.MetaCDRID]),
})
if err != nil {
return err
}
}
}
return cur.Err()
})
return err
}