mirror of
https://github.com/cgrates/cgrates.git
synced 2026-02-11 18:16:24 +05:00
Clean up and refactor after mongo update
- Converting directly from a D to an M is deprecated. We are now decoding directly in a M. - Used errors.As and errors.Is for proper error comparison and assertion - Streamlined function parameters and removed redundancies - Revised sloppy reassignments and added missing error checks - Shortened and reorganized function structure for improved readability
This commit is contained in:
committed by
Dan Christian Bogos
parent
076bb172f1
commit
b5dbb31163
File diff suppressed because it is too large
Load Diff
@@ -20,6 +20,7 @@ package engine
|
||||
|
||||
import (
|
||||
"context"
|
||||
"errors"
|
||||
"fmt"
|
||||
"regexp"
|
||||
"strings"
|
||||
@@ -82,7 +83,8 @@ func (ms *MongoStorage) GetTpIds(colName string) (tpids []string, err error) {
|
||||
}
|
||||
|
||||
func (ms *MongoStorage) GetTpTableIds(tpid, table string, distinctIDs utils.TPDistinctIds,
|
||||
filter map[string]string, pag *utils.PaginatorWithSearch) ([]string, error) {
|
||||
filter map[string]string, pag *utils.PaginatorWithSearch,
|
||||
) ([]string, error) {
|
||||
findMap := bson.M{}
|
||||
if tpid != "" {
|
||||
findMap["tpid"] = tpid
|
||||
@@ -129,18 +131,17 @@ func (ms *MongoStorage) GetTpTableIds(tpid, table string, distinctIDs utils.TPDi
|
||||
fop.SetProjection(selectors)
|
||||
|
||||
distinctIds := make(utils.StringMap)
|
||||
if err := ms.query(func(sctx mongo.SessionContext) (err error) {
|
||||
cur, err := ms.getCol(table).Find(sctx, findMap, fop)
|
||||
if err != nil {
|
||||
return err
|
||||
err := ms.query(func(sctx mongo.SessionContext) error {
|
||||
cur, queryErr := ms.getCol(table).Find(sctx, findMap, fop)
|
||||
if queryErr != nil {
|
||||
return queryErr
|
||||
}
|
||||
for cur.Next(sctx) {
|
||||
var elem bson.D
|
||||
err := cur.Decode(&elem)
|
||||
if err != nil {
|
||||
return err
|
||||
var item bson.M
|
||||
queryErr = cur.Decode(&item)
|
||||
if queryErr != nil {
|
||||
return queryErr
|
||||
}
|
||||
item := elem.Map()
|
||||
|
||||
var id string
|
||||
last := len(distinctIDs) - 1
|
||||
@@ -155,7 +156,8 @@ func (ms *MongoStorage) GetTpTableIds(tpid, table string, distinctIDs utils.TPDi
|
||||
distinctIds[id] = true
|
||||
}
|
||||
return cur.Close(sctx)
|
||||
}); err != nil {
|
||||
})
|
||||
if err != nil {
|
||||
return nil, err
|
||||
}
|
||||
return distinctIds.Slice(), nil
|
||||
@@ -220,29 +222,32 @@ func (ms *MongoStorage) GetTPRates(tpid, id string) ([]*utils.TPRate, error) {
|
||||
if id != "" {
|
||||
filter["id"] = id
|
||||
}
|
||||
var results []*utils.TPRate
|
||||
err := ms.query(func(sctx mongo.SessionContext) (err error) {
|
||||
cur, err := ms.getCol(utils.TBLTPRates).Find(sctx, filter)
|
||||
if err != nil {
|
||||
return err
|
||||
var tpRates []*utils.TPRate
|
||||
err := ms.query(func(sctx mongo.SessionContext) error {
|
||||
cur, queryErr := ms.getCol(utils.TBLTPRates).Find(sctx, filter)
|
||||
if queryErr != nil {
|
||||
return queryErr
|
||||
}
|
||||
for cur.Next(sctx) {
|
||||
var el utils.TPRate
|
||||
err := cur.Decode(&el)
|
||||
if err != nil {
|
||||
return err
|
||||
queryErr = cur.Decode(&el)
|
||||
if queryErr != nil {
|
||||
return queryErr
|
||||
}
|
||||
for _, rs := range el.RateSlots {
|
||||
rs.SetDurations()
|
||||
queryErr = rs.SetDurations()
|
||||
if queryErr != nil {
|
||||
return queryErr
|
||||
}
|
||||
}
|
||||
results = append(results, &el)
|
||||
tpRates = append(tpRates, &el)
|
||||
}
|
||||
if len(results) == 0 {
|
||||
if len(tpRates) == 0 {
|
||||
return utils.ErrNotFound
|
||||
}
|
||||
return cur.Close(sctx)
|
||||
})
|
||||
return results, err
|
||||
return tpRates, err
|
||||
}
|
||||
|
||||
func (ms *MongoStorage) GetTPDestinationRates(tpid, id string, pag *utils.Paginator) ([]*utils.TPDestinationRate, error) {
|
||||
@@ -559,7 +564,7 @@ func (ms *MongoStorage) GetTPAccountActions(tp *utils.TPAccountActions) ([]*util
|
||||
}
|
||||
|
||||
func (ms *MongoStorage) RemTpData(table, tpid string, args map[string]string) error {
|
||||
if len(table) == 0 { // Remove tpid out of all tables
|
||||
if table == "" { // Remove tpid out of all tables
|
||||
return ms.query(func(sctx mongo.SessionContext) error {
|
||||
col, err := ms.DB().ListCollections(sctx, bson.D{}, options.ListCollections().SetNameOnly(true))
|
||||
if err != nil {
|
||||
@@ -644,7 +649,7 @@ func (ms *MongoStorage) SetTPRates(tps []*utils.TPRate) error {
|
||||
m := make(map[string]bool)
|
||||
return ms.query(func(sctx mongo.SessionContext) (err error) {
|
||||
for _, tp := range tps {
|
||||
if found, _ := m[tp.ID]; !found {
|
||||
if found := m[tp.ID]; !found {
|
||||
m[tp.ID] = true
|
||||
_, err := ms.getCol(utils.TBLTPRates).DeleteMany(sctx, bson.M{"tpid": tp.TPid, "id": tp.ID})
|
||||
if err != nil {
|
||||
@@ -667,7 +672,7 @@ func (ms *MongoStorage) SetTPDestinationRates(tps []*utils.TPDestinationRate) er
|
||||
m := make(map[string]bool)
|
||||
return ms.query(func(sctx mongo.SessionContext) (err error) {
|
||||
for _, tp := range tps {
|
||||
if found, _ := m[tp.ID]; !found {
|
||||
if found := m[tp.ID]; !found {
|
||||
m[tp.ID] = true
|
||||
_, err := ms.getCol(utils.TBLTPDestinationRates).DeleteMany(sctx, bson.M{"tpid": tp.TPid, "id": tp.ID})
|
||||
if err != nil {
|
||||
@@ -690,7 +695,7 @@ func (ms *MongoStorage) SetTPRatingPlans(tps []*utils.TPRatingPlan) error {
|
||||
m := make(map[string]bool)
|
||||
return ms.query(func(sctx mongo.SessionContext) (err error) {
|
||||
for _, tp := range tps {
|
||||
if found, _ := m[tp.ID]; !found {
|
||||
if found := m[tp.ID]; !found {
|
||||
m[tp.ID] = true
|
||||
_, err := ms.getCol(utils.TBLTPRatingPlans).DeleteMany(sctx, bson.M{"tpid": tp.TPid, "id": tp.ID})
|
||||
if err != nil {
|
||||
@@ -734,7 +739,7 @@ func (ms *MongoStorage) SetTPSharedGroups(tps []*utils.TPSharedGroups) error {
|
||||
m := make(map[string]bool)
|
||||
return ms.query(func(sctx mongo.SessionContext) (err error) {
|
||||
for _, tp := range tps {
|
||||
if found, _ := m[tp.ID]; !found {
|
||||
if found := m[tp.ID]; !found {
|
||||
m[tp.ID] = true
|
||||
_, err := ms.getCol(utils.TBLTPSharedGroups).DeleteMany(sctx, bson.M{"tpid": tp.TPid, "id": tp.ID})
|
||||
if err != nil {
|
||||
@@ -757,7 +762,7 @@ func (ms *MongoStorage) SetTPActions(tps []*utils.TPActions) error {
|
||||
m := make(map[string]bool)
|
||||
return ms.query(func(sctx mongo.SessionContext) (err error) {
|
||||
for _, tp := range tps {
|
||||
if found, _ := m[tp.ID]; !found {
|
||||
if found := m[tp.ID]; !found {
|
||||
m[tp.ID] = true
|
||||
if _, err := ms.getCol(utils.TBLTPActions).DeleteMany(sctx, bson.M{"tpid": tp.TPid, "id": tp.ID}); err != nil {
|
||||
return err
|
||||
@@ -778,7 +783,7 @@ func (ms *MongoStorage) SetTPActionPlans(tps []*utils.TPActionPlan) error {
|
||||
m := make(map[string]bool)
|
||||
return ms.query(func(sctx mongo.SessionContext) (err error) {
|
||||
for _, tp := range tps {
|
||||
if found, _ := m[tp.ID]; !found {
|
||||
if found := m[tp.ID]; !found {
|
||||
m[tp.ID] = true
|
||||
if _, err := ms.getCol(utils.TBLTPActionPlans).DeleteMany(sctx, bson.M{"tpid": tp.TPid, "id": tp.ID}); err != nil {
|
||||
return err
|
||||
@@ -799,7 +804,7 @@ func (ms *MongoStorage) SetTPActionTriggers(tps []*utils.TPActionTriggers) error
|
||||
m := make(map[string]bool)
|
||||
return ms.query(func(sctx mongo.SessionContext) (err error) {
|
||||
for _, tp := range tps {
|
||||
if found, _ := m[tp.ID]; !found {
|
||||
if found := m[tp.ID]; !found {
|
||||
m[tp.ID] = true
|
||||
if _, err := ms.getCol(utils.TBLTPActionTriggers).DeleteMany(sctx, bson.M{"tpid": tp.TPid, "id": tp.ID}); err != nil {
|
||||
return err
|
||||
@@ -993,22 +998,21 @@ func (ms *MongoStorage) cleanEmptyFilters(filters bson.M) {
|
||||
}
|
||||
}
|
||||
|
||||
// _, err := col(ColCDRs).UpdateAll(bson.M{CGRIDLow: bson.M{"$in": cgrIds}}, bson.M{"$set": bson.M{"deleted_at": time.Now()}})
|
||||
func (ms *MongoStorage) GetCDRs(qryFltr *utils.CDRsFilter, remove bool) ([]*CDR, int64, error) {
|
||||
func (ms *MongoStorage) GetCDRs(qryFltr *utils.CDRsFilter, remove bool) (cdrs []*CDR, n int64, err error) {
|
||||
var minUsage, maxUsage *time.Duration
|
||||
if len(qryFltr.MinUsage) != 0 {
|
||||
if parsed, err := utils.ParseDurationWithNanosecs(qryFltr.MinUsage); err != nil {
|
||||
if qryFltr.MinUsage != "" {
|
||||
parsedDur, err := utils.ParseDurationWithNanosecs(qryFltr.MinUsage)
|
||||
if err != nil {
|
||||
return nil, 0, err
|
||||
} else {
|
||||
minUsage = &parsed
|
||||
}
|
||||
minUsage = &parsedDur
|
||||
}
|
||||
if len(qryFltr.MaxUsage) != 0 {
|
||||
if parsed, err := utils.ParseDurationWithNanosecs(qryFltr.MaxUsage); err != nil {
|
||||
if qryFltr.MaxUsage != "" {
|
||||
parsedDur, err := utils.ParseDurationWithNanosecs(qryFltr.MaxUsage)
|
||||
if err != nil {
|
||||
return nil, 0, err
|
||||
} else {
|
||||
maxUsage = &parsed
|
||||
}
|
||||
maxUsage = &parsedDur
|
||||
}
|
||||
filters := bson.M{
|
||||
CGRIDLow: bson.M{"$in": qryFltr.CGRIDs, "$nin": qryFltr.NotCGRIDs},
|
||||
@@ -1028,20 +1032,20 @@ func (ms *MongoStorage) GetCDRs(qryFltr *utils.CDRsFilter, remove bool) ([]*CDR,
|
||||
CreatedAtLow: bson.M{"$gte": qryFltr.CreatedAtStart, "$lt": qryFltr.CreatedAtEnd},
|
||||
UpdatedAtLow: bson.M{"$gte": qryFltr.UpdatedAtStart, "$lt": qryFltr.UpdatedAtEnd},
|
||||
UsageLow: bson.M{"$gte": minUsage, "$lt": maxUsage},
|
||||
//CostDetailsLow + "." + AccountLow: bson.M{"$in": qryFltr.RatedAccounts, "$nin": qryFltr.NotRatedAccounts},
|
||||
//CostDetailsLow + "." + SubjectLow: bson.M{"$in": qryFltr.RatedSubjects, "$nin": qryFltr.NotRatedSubjects},
|
||||
// CostDetailsLow + "." + AccountLow: bson.M{"$in": qryFltr.RatedAccounts, "$nin": qryFltr.NotRatedAccounts},
|
||||
// CostDetailsLow + "." + SubjectLow: bson.M{"$in": qryFltr.RatedSubjects, "$nin": qryFltr.NotRatedSubjects},
|
||||
}
|
||||
//file, _ := os.CreateTemp(os.TempDir(), "debug")
|
||||
//file.WriteString(fmt.Sprintf("FILTER: %v\n", utils.ToIJSON(qryFltr)))
|
||||
//file.WriteString(fmt.Sprintf("BEFORE: %v\n", utils.ToIJSON(filters)))
|
||||
// file, _ := os.CreateTemp(os.TempDir(), "debug")
|
||||
// file.WriteString(fmt.Sprintf("FILTER: %v\n", utils.ToIJSON(qryFltr)))
|
||||
// file.WriteString(fmt.Sprintf("BEFORE: %v\n", utils.ToIJSON(filters)))
|
||||
ms.cleanEmptyFilters(filters)
|
||||
if len(qryFltr.DestinationPrefixes) != 0 {
|
||||
var regexpRule string
|
||||
for _, prefix := range qryFltr.DestinationPrefixes {
|
||||
if len(prefix) == 0 {
|
||||
if prefix == "" {
|
||||
continue
|
||||
}
|
||||
if len(regexpRule) != 0 {
|
||||
if regexpRule != "" {
|
||||
regexpRule += "|"
|
||||
}
|
||||
regexpRule += "^(" + regexp.QuoteMeta(prefix) + ")"
|
||||
@@ -1063,7 +1067,7 @@ func (ms *MongoStorage) GetCDRs(qryFltr *utils.CDRsFilter, remove bool) ([]*CDR,
|
||||
filters["$and"] = make([]bson.M, 0)
|
||||
}
|
||||
for _, prefix := range qryFltr.NotDestinationPrefixes {
|
||||
if len(prefix) == 0 {
|
||||
if prefix == "" {
|
||||
continue
|
||||
}
|
||||
filters["$and"] = append(filters["$and"].([]bson.M),
|
||||
@@ -1117,17 +1121,17 @@ func (ms *MongoStorage) GetCDRs(qryFltr *utils.CDRsFilter, remove bool) ([]*CDR,
|
||||
filters[CostLow] = bson.M{"$lt": *qryFltr.MaxCost}
|
||||
}
|
||||
}
|
||||
//file.WriteString(fmt.Sprintf("AFTER: %v\n", utils.ToIJSON(filters)))
|
||||
//file.Close()
|
||||
// file.WriteString(fmt.Sprintf("AFTER: %v\n", utils.ToIJSON(filters)))
|
||||
// file.Close()
|
||||
if remove {
|
||||
var chgd int64
|
||||
err := ms.query(func(sctx mongo.SessionContext) (err error) {
|
||||
dr, err := ms.getCol(ColCDRs).DeleteMany(sctx, filters)
|
||||
if err != nil {
|
||||
return err
|
||||
err = ms.query(func(sctx mongo.SessionContext) (queryErr error) {
|
||||
dr, queryErr := ms.getCol(ColCDRs).DeleteMany(sctx, filters)
|
||||
if queryErr != nil {
|
||||
return queryErr
|
||||
}
|
||||
chgd = dr.DeletedCount
|
||||
return err
|
||||
return queryErr
|
||||
})
|
||||
return nil, chgd, err
|
||||
}
|
||||
@@ -1148,7 +1152,6 @@ func (ms *MongoStorage) GetCDRs(qryFltr *utils.CDRsFilter, remove bool) ([]*CDR,
|
||||
ordVal := 1
|
||||
if len(separateVals) == 2 && separateVals[1] == "desc" {
|
||||
ordVal = -1
|
||||
// orderVal += "-"
|
||||
}
|
||||
switch separateVals[0] {
|
||||
case utils.OrderID:
|
||||
@@ -1162,26 +1165,27 @@ func (ms *MongoStorage) GetCDRs(qryFltr *utils.CDRsFilter, remove bool) ([]*CDR,
|
||||
case utils.Cost:
|
||||
orderVal += "cost"
|
||||
default:
|
||||
return nil, 0, fmt.Errorf("Invalid value : %s", separateVals[0])
|
||||
return nil, 0, fmt.Errorf("invalid value : %s", separateVals[0])
|
||||
}
|
||||
fop = fop.SetSort(bson.M{orderVal: ordVal})
|
||||
}
|
||||
if qryFltr.Count {
|
||||
var cnt int64
|
||||
if err := ms.query(func(sctx mongo.SessionContext) (err error) {
|
||||
cnt, err = ms.getCol(ColCDRs).CountDocuments(sctx, filters, cop)
|
||||
return err
|
||||
}); err != nil {
|
||||
err = ms.query(func(sctx mongo.SessionContext) error {
|
||||
var queryErr error
|
||||
cnt, queryErr = ms.getCol(ColCDRs).CountDocuments(sctx, filters, cop)
|
||||
return queryErr
|
||||
})
|
||||
if err != nil {
|
||||
return nil, 0, err
|
||||
}
|
||||
return nil, cnt, nil
|
||||
}
|
||||
// Execute query
|
||||
var cdrs []*CDR
|
||||
err := ms.query(func(sctx mongo.SessionContext) (err error) {
|
||||
cur, err := ms.getCol(ColCDRs).Find(sctx, filters, fop)
|
||||
if err != nil {
|
||||
return err
|
||||
err = ms.query(func(sctx mongo.SessionContext) error {
|
||||
cur, queryErr := ms.getCol(ColCDRs).Find(sctx, filters, fop)
|
||||
if queryErr != nil {
|
||||
return queryErr
|
||||
}
|
||||
for cur.Next(sctx) {
|
||||
cdr := CDR{}
|
||||
@@ -1555,37 +1559,40 @@ func (ms *MongoStorage) SetTPDispatcherHosts(tpDPPs []*utils.TPDispatcherHost) (
|
||||
})
|
||||
}
|
||||
|
||||
func (ms *MongoStorage) GetVersions(itm string) (vrs Versions, err error) {
|
||||
func (ms *MongoStorage) GetVersions(itm string) (Versions, error) {
|
||||
fop := options.FindOne()
|
||||
if itm != "" {
|
||||
fop.SetProjection(bson.M{itm: 1, "_id": 0})
|
||||
} else {
|
||||
fop.SetProjection(bson.M{"_id": 0})
|
||||
}
|
||||
if err = ms.query(func(sctx mongo.SessionContext) (err error) {
|
||||
cur := ms.getCol(ColVer).FindOne(sctx, bson.D{}, fop)
|
||||
if err := cur.Decode(&vrs); err != nil {
|
||||
if err == mongo.ErrNoDocuments {
|
||||
return utils.ErrNotFound
|
||||
}
|
||||
return err
|
||||
var vrs Versions
|
||||
err := ms.query(func(sctx mongo.SessionContext) (err error) {
|
||||
sr := ms.getCol(ColVer).FindOne(sctx, bson.D{}, fop)
|
||||
decodeErr := sr.Decode(&vrs)
|
||||
if errors.Is(decodeErr, mongo.ErrNoDocuments) {
|
||||
return utils.ErrNotFound
|
||||
}
|
||||
return nil
|
||||
}); err != nil {
|
||||
return decodeErr
|
||||
})
|
||||
if err != nil {
|
||||
return nil, err
|
||||
}
|
||||
if len(vrs) == 0 {
|
||||
return nil, utils.ErrNotFound
|
||||
}
|
||||
return
|
||||
return vrs, nil
|
||||
}
|
||||
|
||||
func (ms *MongoStorage) SetVersions(vrs Versions, overwrite bool) (err error) {
|
||||
func (ms *MongoStorage) SetVersions(vrs Versions, overwrite bool) error {
|
||||
if overwrite {
|
||||
ms.RemoveVersions(nil)
|
||||
err := ms.RemoveVersions(nil)
|
||||
if err != nil && !errors.Is(err, utils.ErrNotFound) {
|
||||
return err
|
||||
}
|
||||
}
|
||||
return ms.query(func(sctx mongo.SessionContext) (err error) {
|
||||
_, err = ms.getCol(ColVer).UpdateOne(sctx, bson.D{}, bson.M{"$set": vrs},
|
||||
return ms.query(func(sctx mongo.SessionContext) error {
|
||||
_, err := ms.getCol(ColVer).UpdateOne(sctx, bson.D{}, bson.M{"$set": vrs},
|
||||
options.Update().SetUpsert(true),
|
||||
)
|
||||
return err
|
||||
|
||||
Reference in New Issue
Block a user