From fb6b38850c3c024a635e8101b46e7f348505bea1 Mon Sep 17 00:00:00 2001 From: ionutboangiu Date: Thu, 7 Dec 2023 12:43:26 -0500 Subject: [PATCH] Update mongo SetCDR method to take into account allowUpdate param --- engine/storage_mongo_stordb.go | 83 ++++++++++++++++++++++++---------- 1 file changed, 60 insertions(+), 23 deletions(-) diff --git a/engine/storage_mongo_stordb.go b/engine/storage_mongo_stordb.go index a0beb80de..a84a20a64 100644 --- a/engine/storage_mongo_stordb.go +++ b/engine/storage_mongo_stordb.go @@ -19,6 +19,7 @@ along with this program. If not, see package engine import ( + "errors" "fmt" "strings" "time" @@ -104,36 +105,72 @@ func (ms *MongoStorage) GetStorageType() string { return utils.MetaMongo } -func (ms *MongoStorage) SetCDR(cdr *utils.CGREvent, allowUpdate bool) error { +// SetCDR inserts or updates a CDR in MongoDB. +// If a CDR with the same *cdrID already exists and allowUpdate is true, it updates the existing CDR. +// If allowUpdate is false and a CDR with the same *cdrID exists, it returns an EXISTS error. +func (ms *MongoStorage) SetCDR(ctx *context.Context, cdr *utils.CGREvent, allowUpdate bool) error { + + // Assign a new order ID if it's not already set. if val, has := cdr.Event[utils.OrderID]; has && val == 0 { cdr.Event[utils.OrderID] = ms.counter.Next() } - cdrTable := &CDR{ - Tenant: cdr.Tenant, - Opts: cdr.APIOpts, - Event: cdr.Event, - CreatedAt: time.Now(), - } - return ms.query(context.TODO(), func(sctx mongo.SessionContext) (err error) { - /* - if allowUpdate { - cdrTable.UpdatedAt = time.Now() - _, err = ms.getCol(ColCDRs).UpdateOne(sctx, - //bson.M{"_id": cdrTable.} - //bson.M{CGRIDLow: utils.IfaceAsString(cdr.Event[utils.CGRID])}, - bson.M{"$set": cdrTable}, options.Update().SetUpsert(true)) - return + + return ms.query(ctx, func(sctx mongo.SessionContext) error { + + // Capture the current time once to use for both CreatedAt and UpdatedAt. + currentTime := time.Now() + + _, err := ms.getCol(ColCDRs).InsertOne( + sctx, + &CDR{ + Tenant: cdr.Tenant, + Opts: cdr.APIOpts, + Event: cdr.Event, + CreatedAt: currentTime, + UpdatedAt: currentTime, + }, + ) + if err != nil && isMongoDuplicateError(err) { + if !allowUpdate { + return utils.ErrExists } - */ - _, err = ms.getCol(ColCDRs).InsertOne(sctx, cdrTable) - if err != nil && strings.Contains(err.Error(), "E11000") { // Mongo returns E11000 when key is duplicated - err = utils.ErrExists + + // Prepare an update operation that excludes the CreatedAt field. + update := bson.M{"$set": bson.M{ + "tenant": cdr.Tenant, + "opts": cdr.APIOpts, + "event": cdr.Event, + "updatedAt": currentTime, + }} + + _, err = ms.getCol(ColCDRs).UpdateOne( + sctx, + bson.M{ + "opts.*cdrID": utils.IfaceAsString(cdr.APIOpts[utils.MetaCDRID]), + }, + update, + options.Update().SetUpsert(true), + ) + return err } - return + return err }) } -func (ms *MongoStorage) GetCDRs(_ *context.Context, qryFltr []*Filter, opts map[string]interface{}) (cdrs []*CDR, err error) { +// isMongoDuplicateError checks if the provided error is a MongoDB duplicate key error. +func isMongoDuplicateError(err error) bool { + var e mongo.WriteException + if errors.As(err, &e) { + for _, we := range e.WriteErrors { + if we.Code == 11000 { // MongoDB error code for duplicate key. + return true + } + } + } + return false +} + +func (ms *MongoStorage) GetCDRs(ctx *context.Context, qryFltr []*Filter, opts map[string]interface{}) (cdrs []*CDR, err error) { fltrs := make(bson.M) for _, fltr := range qryFltr { for _, rule := range fltr.Rules { @@ -167,7 +204,7 @@ func (ms *MongoStorage) GetCDRs(_ *context.Context, qryFltr []*Filter, opts map[ // cop.SetSkip(int64(offset)) // Execute query - err = ms.query(context.TODO(), func(sctx mongo.SessionContext) (err error) { + err = ms.query(ctx, func(sctx mongo.SessionContext) (err error) { cur, err := ms.getCol(ColCDRs).Find(sctx, fltrs, fop) if err != nil { return err