Updated mongodb driver

This commit is contained in:
Trial97
2019-03-12 09:26:23 +02:00
committed by Dan Christian Bogos
parent 8dcbd3a67b
commit f4f5e6b061
2 changed files with 179 additions and 478 deletions

File diff suppressed because it is too large Load Diff

View File

@@ -48,10 +48,8 @@ func (ms *MongoStorage) GetTpIds(colName string) (tpids []string, err error) {
}
tpidMap := make(map[string]struct{})
ctxSession, ctxSessionCancel := context.WithTimeout(ms.ctx, ms.ctxTTL)
defer ctxSessionCancel()
if colName == "" {
if err := ms.client.UseSession(ctxSession, func(sctx mongo.SessionContext) error {
if err := ms.query(func(sctx mongo.SessionContext) error {
col, err := ms.DB().ListCollections(sctx, bson.D{}, options.ListCollections().SetNameOnly(true))
if err != nil {
return err
@@ -70,7 +68,7 @@ func (ms *MongoStorage) GetTpIds(colName string) (tpids []string, err error) {
return nil, err
}
} else {
if err := ms.client.UseSession(ctxSession, func(sctx mongo.SessionContext) error {
if err := ms.query(func(sctx mongo.SessionContext) error {
tpidMap, err = getTpIDs(sctx, colName, tpidMap)
return err
}); err != nil {
@@ -121,9 +119,7 @@ func (ms *MongoStorage) GetTpTableIds(tpid, table string, distinct utils.TPDisti
fop.SetProjection(selectors)
distinctIds := make(utils.StringMap)
ctxSession, ctxSessionCancel := context.WithTimeout(ms.ctx, ms.ctxTTL)
defer ctxSessionCancel()
if err := ms.client.UseSession(ctxSession, func(sctx mongo.SessionContext) (err error) {
if err := ms.query(func(sctx mongo.SessionContext) (err error) {
cur, err := ms.getCol(table).Find(sctx, findMap, fop)
if err != nil {
return err
@@ -161,9 +157,7 @@ func (ms *MongoStorage) GetTPTimings(tpid, id string) ([]*utils.ApierTPTiming, e
filter["id"] = id
}
var results []*utils.ApierTPTiming
ctxSession, ctxSessionCancel := context.WithTimeout(ms.ctx, ms.ctxTTL)
defer ctxSessionCancel()
err := ms.client.UseSession(ctxSession, func(sctx mongo.SessionContext) (err error) {
err := ms.query(func(sctx mongo.SessionContext) (err error) {
cur, err := ms.getCol(utils.TBLTPTimings).Find(sctx, filter)
if err != nil {
return err
@@ -190,9 +184,7 @@ func (ms *MongoStorage) GetTPDestinations(tpid, id string) ([]*utils.TPDestinati
filter["id"] = id
}
var results []*utils.TPDestination
ctxSession, ctxSessionCancel := context.WithTimeout(ms.ctx, ms.ctxTTL)
defer ctxSessionCancel()
err := ms.client.UseSession(ctxSession, func(sctx mongo.SessionContext) (err error) {
err := ms.query(func(sctx mongo.SessionContext) (err error) {
cur, err := ms.getCol(utils.TBLTPDestinations).Find(sctx, filter)
if err != nil {
return err
@@ -219,9 +211,7 @@ func (ms *MongoStorage) GetTPRates(tpid, id string) ([]*utils.TPRate, error) {
filter["id"] = id
}
var results []*utils.TPRate
ctxSession, ctxSessionCancel := context.WithTimeout(ms.ctx, ms.ctxTTL)
defer ctxSessionCancel()
err := ms.client.UseSession(ctxSession, func(sctx mongo.SessionContext) (err error) {
err := ms.query(func(sctx mongo.SessionContext) (err error) {
cur, err := ms.getCol(utils.TBLTPRates).Find(sctx, filter)
if err != nil {
return err
@@ -260,9 +250,7 @@ func (ms *MongoStorage) GetTPDestinationRates(tpid, id string, pag *utils.Pagina
fop = fop.SetSkip(int64(*pag.Offset))
}
}
ctxSession, ctxSessionCancel := context.WithTimeout(ms.ctx, ms.ctxTTL)
defer ctxSessionCancel()
err := ms.client.UseSession(ctxSession, func(sctx mongo.SessionContext) (err error) {
err := ms.query(func(sctx mongo.SessionContext) (err error) {
cur, err := ms.getCol(utils.TBLTPDestinationRates).Find(sctx, filter, fop)
if err != nil {
return err
@@ -298,9 +286,7 @@ func (ms *MongoStorage) GetTPRatingPlans(tpid, id string, pag *utils.Paginator)
fop = fop.SetSkip(int64(*pag.Offset))
}
}
ctxSession, ctxSessionCancel := context.WithTimeout(ms.ctx, ms.ctxTTL)
defer ctxSessionCancel()
err := ms.client.UseSession(ctxSession, func(sctx mongo.SessionContext) (err error) {
err := ms.query(func(sctx mongo.SessionContext) (err error) {
cur, err := ms.getCol(utils.TBLTPRatingPlans).Find(sctx, filter, fop)
if err != nil {
return err
@@ -336,9 +322,7 @@ func (ms *MongoStorage) GetTPRatingProfiles(tp *utils.TPRatingProfile) ([]*utils
filter["loadid"] = tp.LoadId
}
var results []*utils.TPRatingProfile
ctxSession, ctxSessionCancel := context.WithTimeout(ms.ctx, ms.ctxTTL)
defer ctxSessionCancel()
err := ms.client.UseSession(ctxSession, func(sctx mongo.SessionContext) (err error) {
err := ms.query(func(sctx mongo.SessionContext) (err error) {
cur, err := ms.getCol(utils.TBLTPRateProfiles).Find(sctx, filter)
if err != nil {
return err
@@ -365,9 +349,7 @@ func (ms *MongoStorage) GetTPSharedGroups(tpid, id string) ([]*utils.TPSharedGro
filter["id"] = id
}
var results []*utils.TPSharedGroups
ctxSession, ctxSessionCancel := context.WithTimeout(ms.ctx, ms.ctxTTL)
defer ctxSessionCancel()
err := ms.client.UseSession(ctxSession, func(sctx mongo.SessionContext) (err error) {
err := ms.query(func(sctx mongo.SessionContext) (err error) {
cur, err := ms.getCol(utils.TBLTPSharedGroups).Find(sctx, filter)
if err != nil {
return err
@@ -397,9 +379,7 @@ func (ms *MongoStorage) GetTPResources(tpid, tenant, id string) ([]*utils.TPReso
filter["tenant"] = tenant
}
var results []*utils.TPResource
ctxSession, ctxSessionCancel := context.WithTimeout(ms.ctx, ms.ctxTTL)
defer ctxSessionCancel()
err := ms.client.UseSession(ctxSession, func(sctx mongo.SessionContext) (err error) {
err := ms.query(func(sctx mongo.SessionContext) (err error) {
cur, err := ms.getCol(utils.TBLTPResources).Find(sctx, filter)
if err != nil {
return err
@@ -431,9 +411,7 @@ func (ms *MongoStorage) GetTPStats(tpid, tenant, id string) ([]*utils.TPStats, e
filter["tenant"] = tenant
}
var results []*utils.TPStats
ctxSession, ctxSessionCancel := context.WithTimeout(ms.ctx, ms.ctxTTL)
defer ctxSessionCancel()
err := ms.client.UseSession(ctxSession, func(sctx mongo.SessionContext) (err error) {
err := ms.query(func(sctx mongo.SessionContext) (err error) {
cur, err := ms.getCol(utils.TBLTPStats).Find(sctx, filter)
if err != nil {
return err
@@ -460,9 +438,7 @@ func (ms *MongoStorage) GetTPActions(tpid, id string) ([]*utils.TPActions, error
filter["id"] = id
}
var results []*utils.TPActions
ctxSession, ctxSessionCancel := context.WithTimeout(ms.ctx, ms.ctxTTL)
defer ctxSessionCancel()
err := ms.client.UseSession(ctxSession, func(sctx mongo.SessionContext) (err error) {
err := ms.query(func(sctx mongo.SessionContext) (err error) {
cur, err := ms.getCol(utils.TBLTPActions).Find(sctx, filter)
if err != nil {
return err
@@ -489,9 +465,7 @@ func (ms *MongoStorage) GetTPActionPlans(tpid, id string) ([]*utils.TPActionPlan
filter["id"] = id
}
var results []*utils.TPActionPlan
ctxSession, ctxSessionCancel := context.WithTimeout(ms.ctx, ms.ctxTTL)
defer ctxSessionCancel()
err := ms.client.UseSession(ctxSession, func(sctx mongo.SessionContext) (err error) {
err := ms.query(func(sctx mongo.SessionContext) (err error) {
cur, err := ms.getCol(utils.TBLTPActionPlans).Find(sctx, filter)
if err != nil {
return err
@@ -520,9 +494,7 @@ func (ms *MongoStorage) GetTPActionTriggers(tpid, id string) ([]*utils.TPActionT
filter["id"] = id
}
var results []*utils.TPActionTriggers
ctxSession, ctxSessionCancel := context.WithTimeout(ms.ctx, ms.ctxTTL)
defer ctxSessionCancel()
err := ms.client.UseSession(ctxSession, func(sctx mongo.SessionContext) (err error) {
err := ms.query(func(sctx mongo.SessionContext) (err error) {
cur, err := ms.getCol(utils.TBLTPActionTriggers).Find(sctx, filter)
if err != nil {
return err
@@ -555,9 +527,7 @@ func (ms *MongoStorage) GetTPAccountActions(tp *utils.TPAccountActions) ([]*util
filter["loadid"] = tp.LoadId
}
var results []*utils.TPAccountActions
ctxSession, ctxSessionCancel := context.WithTimeout(ms.ctx, ms.ctxTTL)
defer ctxSessionCancel()
err := ms.client.UseSession(ctxSession, func(sctx mongo.SessionContext) (err error) {
err := ms.query(func(sctx mongo.SessionContext) (err error) {
cur, err := ms.getCol(utils.TBLTPAccountActions).Find(sctx, filter)
if err != nil {
return err
@@ -580,9 +550,7 @@ func (ms *MongoStorage) GetTPAccountActions(tp *utils.TPAccountActions) ([]*util
func (ms *MongoStorage) RemTpData(table, tpid string, args map[string]string) error {
if len(table) == 0 { // Remove tpid out of all tables
ctxSession, ctxSessionCancel := context.WithTimeout(ms.ctx, ms.ctxTTL)
defer ctxSessionCancel()
return ms.client.UseSession(ctxSession, func(sctx mongo.SessionContext) error {
return ms.query(func(sctx mongo.SessionContext) error {
col, err := ms.DB().ListCollections(sctx, bson.D{}, options.ListCollections().SetNameOnly(true))
if err != nil {
return err
@@ -614,9 +582,7 @@ func (ms *MongoStorage) RemTpData(table, tpid string, args map[string]string) er
if tpid != "" {
args["tpid"] = tpid
}
ctxSession, ctxSessionCancel := context.WithTimeout(ms.ctx, ms.ctxTTL)
defer ctxSessionCancel()
return ms.client.UseSession(ctxSession, func(sctx mongo.SessionContext) (err error) {
return ms.query(func(sctx mongo.SessionContext) (err error) {
dr, err := ms.getCol(table).DeleteOne(sctx, args)
if dr.DeletedCount == 0 {
return utils.ErrNotFound
@@ -629,9 +595,7 @@ func (ms *MongoStorage) SetTPTimings(tps []*utils.ApierTPTiming) error {
if len(tps) == 0 {
return nil
}
ctxSession, ctxSessionCancel := context.WithTimeout(ms.ctx, ms.ctxTTL)
defer ctxSessionCancel()
return ms.client.UseSession(ctxSession, func(sctx mongo.SessionContext) (err error) {
return ms.query(func(sctx mongo.SessionContext) (err error) {
for _, tp := range tps {
_, err = ms.getCol(utils.TBLTPTimings).UpdateOne(sctx, bson.M{"tpid": tp.TPid, "id": tp.ID},
bson.M{"$set": tp},
@@ -649,9 +613,7 @@ func (ms *MongoStorage) SetTPDestinations(tpDsts []*utils.TPDestination) (err er
if len(tpDsts) == 0 {
return nil
}
ctxSession, ctxSessionCancel := context.WithTimeout(ms.ctx, ms.ctxTTL)
defer ctxSessionCancel()
return ms.client.UseSession(ctxSession, func(sctx mongo.SessionContext) (err error) {
return ms.query(func(sctx mongo.SessionContext) (err error) {
for _, tp := range tpDsts {
_, err = ms.getCol(utils.TBLTPDestinations).UpdateOne(sctx, bson.M{"tpid": tp.TPid, "id": tp.ID},
bson.M{"$set": tp},
@@ -670,9 +632,7 @@ func (ms *MongoStorage) SetTPRates(tps []*utils.TPRate) error {
return nil
}
m := make(map[string]bool)
ctxSession, ctxSessionCancel := context.WithTimeout(ms.ctx, ms.ctxTTL)
defer ctxSessionCancel()
return ms.client.UseSession(ctxSession, func(sctx mongo.SessionContext) (err error) {
return ms.query(func(sctx mongo.SessionContext) (err error) {
for _, tp := range tps {
if found, _ := m[tp.ID]; !found {
m[tp.ID] = true
@@ -695,9 +655,7 @@ func (ms *MongoStorage) SetTPDestinationRates(tps []*utils.TPDestinationRate) er
return nil
}
m := make(map[string]bool)
ctxSession, ctxSessionCancel := context.WithTimeout(ms.ctx, ms.ctxTTL)
defer ctxSessionCancel()
return ms.client.UseSession(ctxSession, func(sctx mongo.SessionContext) (err error) {
return ms.query(func(sctx mongo.SessionContext) (err error) {
for _, tp := range tps {
if found, _ := m[tp.ID]; !found {
m[tp.ID] = true
@@ -720,9 +678,7 @@ func (ms *MongoStorage) SetTPRatingPlans(tps []*utils.TPRatingPlan) error {
return nil
}
m := make(map[string]bool)
ctxSession, ctxSessionCancel := context.WithTimeout(ms.ctx, ms.ctxTTL)
defer ctxSessionCancel()
return ms.client.UseSession(ctxSession, func(sctx mongo.SessionContext) (err error) {
return ms.query(func(sctx mongo.SessionContext) (err error) {
for _, tp := range tps {
if found, _ := m[tp.ID]; !found {
m[tp.ID] = true
@@ -744,9 +700,7 @@ func (ms *MongoStorage) SetTPRatingProfiles(tps []*utils.TPRatingProfile) error
if len(tps) == 0 {
return nil
}
ctxSession, ctxSessionCancel := context.WithTimeout(ms.ctx, ms.ctxTTL)
defer ctxSessionCancel()
return ms.client.UseSession(ctxSession, func(sctx mongo.SessionContext) (err error) {
return ms.query(func(sctx mongo.SessionContext) (err error) {
for _, tp := range tps {
_, err = ms.getCol(utils.TBLTPRateProfiles).UpdateOne(sctx, bson.M{
"tpid": tp.TPid,
@@ -768,9 +722,7 @@ func (ms *MongoStorage) SetTPSharedGroups(tps []*utils.TPSharedGroups) error {
return nil
}
m := make(map[string]bool)
ctxSession, ctxSessionCancel := context.WithTimeout(ms.ctx, ms.ctxTTL)
defer ctxSessionCancel()
return ms.client.UseSession(ctxSession, func(sctx mongo.SessionContext) (err error) {
return ms.query(func(sctx mongo.SessionContext) (err error) {
for _, tp := range tps {
if found, _ := m[tp.ID]; !found {
m[tp.ID] = true
@@ -793,9 +745,7 @@ func (ms *MongoStorage) SetTPActions(tps []*utils.TPActions) error {
return nil
}
m := make(map[string]bool)
ctxSession, ctxSessionCancel := context.WithTimeout(ms.ctx, ms.ctxTTL)
defer ctxSessionCancel()
return ms.client.UseSession(ctxSession, func(sctx mongo.SessionContext) (err error) {
return ms.query(func(sctx mongo.SessionContext) (err error) {
for _, tp := range tps {
if found, _ := m[tp.ID]; !found {
m[tp.ID] = true
@@ -816,9 +766,7 @@ func (ms *MongoStorage) SetTPActionPlans(tps []*utils.TPActionPlan) error {
return nil
}
m := make(map[string]bool)
ctxSession, ctxSessionCancel := context.WithTimeout(ms.ctx, ms.ctxTTL)
defer ctxSessionCancel()
return ms.client.UseSession(ctxSession, func(sctx mongo.SessionContext) (err error) {
return ms.query(func(sctx mongo.SessionContext) (err error) {
for _, tp := range tps {
if found, _ := m[tp.ID]; !found {
m[tp.ID] = true
@@ -839,9 +787,7 @@ func (ms *MongoStorage) SetTPActionTriggers(tps []*utils.TPActionTriggers) error
return nil
}
m := make(map[string]bool)
ctxSession, ctxSessionCancel := context.WithTimeout(ms.ctx, ms.ctxTTL)
defer ctxSessionCancel()
return ms.client.UseSession(ctxSession, func(sctx mongo.SessionContext) (err error) {
return ms.query(func(sctx mongo.SessionContext) (err error) {
for _, tp := range tps {
if found, _ := m[tp.ID]; !found {
m[tp.ID] = true
@@ -861,9 +807,7 @@ func (ms *MongoStorage) SetTPAccountActions(tps []*utils.TPAccountActions) error
if len(tps) == 0 {
return nil
}
ctxSession, ctxSessionCancel := context.WithTimeout(ms.ctx, ms.ctxTTL)
defer ctxSessionCancel()
return ms.client.UseSession(ctxSession, func(sctx mongo.SessionContext) (err error) {
return ms.query(func(sctx mongo.SessionContext) (err error) {
for _, tp := range tps {
_, err = ms.getCol(utils.TBLTPAccountActions).UpdateOne(sctx, bson.M{
"tpid": tp.TPid,
@@ -883,9 +827,7 @@ func (ms *MongoStorage) SetTPResources(tpRLs []*utils.TPResource) (err error) {
if len(tpRLs) == 0 {
return
}
ctxSession, ctxSessionCancel := context.WithTimeout(ms.ctx, ms.ctxTTL)
defer ctxSessionCancel()
return ms.client.UseSession(ctxSession, func(sctx mongo.SessionContext) (err error) {
return ms.query(func(sctx mongo.SessionContext) (err error) {
for _, tp := range tpRLs {
_, err = ms.getCol(utils.TBLTPResources).UpdateOne(sctx, bson.M{"tpid": tp.TPid, "id": tp.ID},
bson.M{"$set": tp}, options.Update().SetUpsert(true))
@@ -901,9 +843,7 @@ func (ms *MongoStorage) SetTPRStats(tps []*utils.TPStats) (err error) {
if len(tps) == 0 {
return
}
ctxSession, ctxSessionCancel := context.WithTimeout(ms.ctx, ms.ctxTTL)
defer ctxSessionCancel()
return ms.client.UseSession(ctxSession, func(sctx mongo.SessionContext) (err error) {
return ms.query(func(sctx mongo.SessionContext) (err error) {
for _, tp := range tps {
_, err = ms.getCol(utils.TBLTPStats).UpdateOne(sctx, bson.M{"tpid": tp.TPid, "id": tp.ID},
bson.M{"$set": tp}, options.Update().SetUpsert(true))
@@ -919,9 +859,7 @@ func (ms *MongoStorage) SetSMCost(smc *SMCost) error {
if smc.CostDetails == nil {
return nil
}
ctxSession, ctxSessionCancel := context.WithTimeout(ms.ctx, ms.ctxTTL)
defer ctxSessionCancel()
return ms.client.UseSession(ctxSession, func(sctx mongo.SessionContext) (err error) {
return ms.query(func(sctx mongo.SessionContext) (err error) {
_, err = ms.getCol(utils.SessionCostsTBL).InsertOne(sctx, smc)
return err
})
@@ -932,9 +870,7 @@ func (ms *MongoStorage) RemoveSMCost(smc *SMCost) error {
if smc != nil {
remParams = bson.M{"cgrid": smc.CGRID, "runid": smc.RunID}
}
ctxSession, ctxSessionCancel := context.WithTimeout(ms.ctx, ms.ctxTTL)
defer ctxSessionCancel()
return ms.client.UseSession(ctxSession, func(sctx mongo.SessionContext) (err error) {
return ms.query(func(sctx mongo.SessionContext) (err error) {
_, err = ms.getCol(utils.SessionCostsTBL).DeleteMany(sctx, remParams)
return err
})
@@ -954,9 +890,7 @@ func (ms *MongoStorage) GetSMCosts(cgrid, runid, originHost, originIDPrefix stri
if originIDPrefix != "" {
filter[OriginIDLow] = bsonx.Regex(fmt.Sprintf("^%s", originIDPrefix), "")
}
ctxSession, ctxSessionCancel := context.WithTimeout(ms.ctx, ms.ctxTTL)
defer ctxSessionCancel()
err = ms.client.UseSession(ctxSession, func(sctx mongo.SessionContext) (err error) {
err = ms.query(func(sctx mongo.SessionContext) (err error) {
cur, err := ms.getCol(utils.SessionCostsTBL).Find(sctx, filter)
if err != nil {
return err
@@ -982,9 +916,7 @@ func (ms *MongoStorage) SetCDR(cdr *CDR, allowUpdate bool) (err error) {
if cdr.OrderID == 0 {
cdr.OrderID = ms.cnter.Next()
}
ctxSession, ctxSessionCancel := context.WithTimeout(ms.ctx, ms.ctxTTL)
defer ctxSessionCancel()
return ms.client.UseSession(ctxSession, func(sctx mongo.SessionContext) (err error) {
return ms.query(func(sctx mongo.SessionContext) (err error) {
if allowUpdate {
_, err = ms.getCol(ColCDRs).UpdateOne(sctx,
bson.M{CGRIDLow: cdr.CGRID, RunIDLow: cdr.RunID},
@@ -1144,9 +1076,7 @@ func (ms *MongoStorage) GetCDRs(qryFltr *utils.CDRsFilter, remove bool) ([]*CDR,
//file.Close()
if remove {
var chgd int64
ctxSession, ctxSessionCancel := context.WithTimeout(ms.ctx, ms.ctxTTL)
defer ctxSessionCancel()
err := ms.client.UseSession(ctxSession, func(sctx mongo.SessionContext) (err error) {
err := ms.query(func(sctx mongo.SessionContext) (err error) {
dr, err := ms.getCol(ColCDRs).DeleteMany(sctx, filters)
chgd = dr.DeletedCount
return err
@@ -1190,9 +1120,7 @@ func (ms *MongoStorage) GetCDRs(qryFltr *utils.CDRsFilter, remove bool) ([]*CDR,
}
if qryFltr.Count {
var cnt int64
ctxSession, ctxSessionCancel := context.WithTimeout(ms.ctx, ms.ctxTTL)
defer ctxSessionCancel()
if err := ms.client.UseSession(ctxSession, func(sctx mongo.SessionContext) (err error) {
if err := ms.query(func(sctx mongo.SessionContext) (err error) {
cnt, err = ms.getCol(ColCDRs).Count(sctx, filters, cop)
return err
}); err != nil {
@@ -1202,9 +1130,7 @@ func (ms *MongoStorage) GetCDRs(qryFltr *utils.CDRsFilter, remove bool) ([]*CDR,
}
// Execute query
var cdrs []*CDR
ctxSession, ctxSessionCancel := context.WithTimeout(ms.ctx, ms.ctxTTL)
defer ctxSessionCancel()
err := ms.client.UseSession(ctxSession, func(sctx mongo.SessionContext) (err error) {
err := ms.query(func(sctx mongo.SessionContext) (err error) {
cur, err := ms.getCol(ColCDRs).Find(sctx, filters, fop)
if err != nil {
return err
@@ -1230,9 +1156,7 @@ func (ms *MongoStorage) SetTPStats(tpSTs []*utils.TPStats) (err error) {
if len(tpSTs) == 0 {
return
}
ctxSession, ctxSessionCancel := context.WithTimeout(ms.ctx, ms.ctxTTL)
defer ctxSessionCancel()
return ms.client.UseSession(ctxSession, func(sctx mongo.SessionContext) (err error) {
return ms.query(func(sctx mongo.SessionContext) (err error) {
for _, tp := range tpSTs {
_, err = ms.getCol(utils.TBLTPStats).UpdateOne(sctx, bson.M{"tpid": tp.TPid, "id": tp.ID},
bson.M{"$set": tp},
@@ -1255,9 +1179,7 @@ func (ms *MongoStorage) GetTPThresholds(tpid, tenant, id string) ([]*utils.TPThr
filter["tenant"] = tenant
}
var results []*utils.TPThreshold
ctxSession, ctxSessionCancel := context.WithTimeout(ms.ctx, ms.ctxTTL)
defer ctxSessionCancel()
err := ms.client.UseSession(ctxSession, func(sctx mongo.SessionContext) (err error) {
err := ms.query(func(sctx mongo.SessionContext) (err error) {
cur, err := ms.getCol(utils.TBLTPThresholds).Find(sctx, filter)
if err != nil {
return err
@@ -1282,9 +1204,7 @@ func (ms *MongoStorage) SetTPThresholds(tpTHs []*utils.TPThreshold) (err error)
if len(tpTHs) == 0 {
return
}
ctxSession, ctxSessionCancel := context.WithTimeout(ms.ctx, ms.ctxTTL)
defer ctxSessionCancel()
return ms.client.UseSession(ctxSession, func(sctx mongo.SessionContext) (err error) {
return ms.query(func(sctx mongo.SessionContext) (err error) {
for _, tp := range tpTHs {
_, err = ms.getCol(utils.TBLTPThresholds).UpdateOne(sctx, bson.M{"tpid": tp.TPid, "id": tp.ID},
bson.M{"$set": tp},
@@ -1307,9 +1227,7 @@ func (ms *MongoStorage) GetTPFilters(tpid, tenant, id string) ([]*utils.TPFilter
filter["tenant"] = tenant
}
results := []*utils.TPFilterProfile{}
ctxSession, ctxSessionCancel := context.WithTimeout(ms.ctx, ms.ctxTTL)
defer ctxSessionCancel()
err := ms.client.UseSession(ctxSession, func(sctx mongo.SessionContext) (err error) {
err := ms.query(func(sctx mongo.SessionContext) (err error) {
cur, err := ms.getCol(utils.TBLTPFilters).Find(sctx, filter)
if err != nil {
return err
@@ -1334,9 +1252,7 @@ func (ms *MongoStorage) SetTPFilters(tpTHs []*utils.TPFilterProfile) (err error)
if len(tpTHs) == 0 {
return
}
ctxSession, ctxSessionCancel := context.WithTimeout(ms.ctx, ms.ctxTTL)
defer ctxSessionCancel()
return ms.client.UseSession(ctxSession, func(sctx mongo.SessionContext) (err error) {
return ms.query(func(sctx mongo.SessionContext) (err error) {
for _, tp := range tpTHs {
_, err = ms.getCol(utils.TBLTPFilters).UpdateOne(sctx, bson.M{"tpid": tp.TPid, "id": tp.ID},
bson.M{"$set": tp},
@@ -1359,9 +1275,7 @@ func (ms *MongoStorage) GetTPSuppliers(tpid, tenant, id string) ([]*utils.TPSupp
filter["tenant"] = tenant
}
var results []*utils.TPSupplierProfile
ctxSession, ctxSessionCancel := context.WithTimeout(ms.ctx, ms.ctxTTL)
defer ctxSessionCancel()
err := ms.client.UseSession(ctxSession, func(sctx mongo.SessionContext) (err error) {
err := ms.query(func(sctx mongo.SessionContext) (err error) {
cur, err := ms.getCol(utils.TBLTPSuppliers).Find(sctx, filter)
if err != nil {
return err
@@ -1386,9 +1300,7 @@ func (ms *MongoStorage) SetTPSuppliers(tpSPs []*utils.TPSupplierProfile) (err er
if len(tpSPs) == 0 {
return
}
ctxSession, ctxSessionCancel := context.WithTimeout(ms.ctx, ms.ctxTTL)
defer ctxSessionCancel()
return ms.client.UseSession(ctxSession, func(sctx mongo.SessionContext) (err error) {
return ms.query(func(sctx mongo.SessionContext) (err error) {
for _, tp := range tpSPs {
_, err = ms.getCol(utils.TBLTPSuppliers).UpdateOne(sctx, bson.M{"tpid": tp.TPid, "id": tp.ID},
bson.M{"$set": tp},
@@ -1411,9 +1323,7 @@ func (ms *MongoStorage) GetTPAttributes(tpid, tenant, id string) ([]*utils.TPAtt
filter["tenant"] = tenant
}
var results []*utils.TPAttributeProfile
ctxSession, ctxSessionCancel := context.WithTimeout(ms.ctx, ms.ctxTTL)
defer ctxSessionCancel()
err := ms.client.UseSession(ctxSession, func(sctx mongo.SessionContext) (err error) {
err := ms.query(func(sctx mongo.SessionContext) (err error) {
cur, err := ms.getCol(utils.TBLTPAttributes).Find(sctx, filter)
if err != nil {
return err
@@ -1438,9 +1348,7 @@ func (ms *MongoStorage) SetTPAttributes(tpSPs []*utils.TPAttributeProfile) (err
if len(tpSPs) == 0 {
return
}
ctxSession, ctxSessionCancel := context.WithTimeout(ms.ctx, ms.ctxTTL)
defer ctxSessionCancel()
return ms.client.UseSession(ctxSession, func(sctx mongo.SessionContext) (err error) {
return ms.query(func(sctx mongo.SessionContext) (err error) {
for _, tp := range tpSPs {
_, err = ms.getCol(utils.TBLTPAttributes).UpdateOne(sctx, bson.M{"tpid": tp.TPid, "id": tp.ID},
bson.M{"$set": tp},
@@ -1463,9 +1371,7 @@ func (ms *MongoStorage) GetTPChargers(tpid, tenant, id string) ([]*utils.TPCharg
filter["tenant"] = tenant
}
var results []*utils.TPChargerProfile
ctxSession, ctxSessionCancel := context.WithTimeout(ms.ctx, ms.ctxTTL)
defer ctxSessionCancel()
err := ms.client.UseSession(ctxSession, func(sctx mongo.SessionContext) (err error) {
err := ms.query(func(sctx mongo.SessionContext) (err error) {
cur, err := ms.getCol(utils.TBLTPChargers).Find(sctx, filter)
if err != nil {
return err
@@ -1490,9 +1396,7 @@ func (ms *MongoStorage) SetTPChargers(tpCPP []*utils.TPChargerProfile) (err erro
if len(tpCPP) == 0 {
return
}
ctxSession, ctxSessionCancel := context.WithTimeout(ms.ctx, ms.ctxTTL)
defer ctxSessionCancel()
return ms.client.UseSession(ctxSession, func(sctx mongo.SessionContext) (err error) {
return ms.query(func(sctx mongo.SessionContext) (err error) {
for _, tp := range tpCPP {
_, err = ms.getCol(utils.TBLTPChargers).UpdateOne(sctx, bson.M{"tpid": tp.TPid, "id": tp.ID},
bson.M{"$set": tp},
@@ -1515,9 +1419,7 @@ func (ms *MongoStorage) GetTPDispatchers(tpid, tenant, id string) ([]*utils.TPDi
filter["tenant"] = tenant
}
var results []*utils.TPDispatcherProfile
ctxSession, ctxSessionCancel := context.WithTimeout(ms.ctx, ms.ctxTTL)
defer ctxSessionCancel()
err := ms.client.UseSession(ctxSession, func(sctx mongo.SessionContext) (err error) {
err := ms.query(func(sctx mongo.SessionContext) (err error) {
cur, err := ms.getCol(utils.TBLTPDispatchers).Find(sctx, filter)
if err != nil {
return err
@@ -1542,9 +1444,7 @@ func (ms *MongoStorage) SetTPDispatchers(tpDPPs []*utils.TPDispatcherProfile) (e
if len(tpDPPs) == 0 {
return
}
ctxSession, ctxSessionCancel := context.WithTimeout(ms.ctx, ms.ctxTTL)
defer ctxSessionCancel()
return ms.client.UseSession(ctxSession, func(sctx mongo.SessionContext) (err error) {
return ms.query(func(sctx mongo.SessionContext) (err error) {
for _, tp := range tpDPPs {
_, err = ms.getCol(utils.TBLTPDispatchers).UpdateOne(sctx, bson.M{"tpid": tp.TPid, "id": tp.ID},
bson.M{"$set": tp},
@@ -1565,9 +1465,7 @@ func (ms *MongoStorage) GetVersions(itm string) (vrs Versions, err error) {
} else {
fop.SetProjection(bson.M{"_id": 0})
}
ctxSession, ctxSessionCancel := context.WithTimeout(ms.ctx, ms.ctxTTL)
defer ctxSessionCancel()
if err = ms.client.UseSession(ctxSession, func(sctx mongo.SessionContext) (err error) {
if err = ms.query(func(sctx mongo.SessionContext) (err error) {
cur := ms.getCol(colVer).FindOne(sctx, bson.D{}, fop)
if err := cur.Decode(&vrs); err != nil {
if err == mongo.ErrNoDocuments {
@@ -1589,16 +1487,14 @@ func (ms *MongoStorage) SetVersions(vrs Versions, overwrite bool) (err error) {
if overwrite {
ms.RemoveVersions(nil)
}
ctxSession, ctxSessionCancel := context.WithTimeout(ms.ctx, ms.ctxTTL)
defer ctxSessionCancel()
return ms.client.UseSession(ctxSession, func(sctx mongo.SessionContext) (err error) {
return ms.query(func(sctx mongo.SessionContext) (err error) {
_, err = ms.getCol(colVer).UpdateOne(sctx, bson.D{}, bson.M{"$set": vrs},
options.Update().SetUpsert(true),
)
return err
})
// }
// return ms.client.UseSession(ctxSession, func(sctx mongo.SessionContext) error {
// return ms.query( func(sctx mongo.SessionContext) error {
// _, err := ms.getCol(colVer).InsertOne(sctx, vrs)
// return err
// })
@@ -1607,9 +1503,7 @@ func (ms *MongoStorage) SetVersions(vrs Versions, overwrite bool) (err error) {
func (ms *MongoStorage) RemoveVersions(vrs Versions) (err error) {
if len(vrs) == 0 {
ctxSession, ctxSessionCancel := context.WithTimeout(ms.ctx, ms.ctxTTL)
defer ctxSessionCancel()
return ms.client.UseSession(ctxSession, func(sctx mongo.SessionContext) (err error) {
return ms.query(func(sctx mongo.SessionContext) (err error) {
dr, err := ms.getCol(colVer).DeleteOne(sctx, bson.D{})
if dr.DeletedCount == 0 {
return utils.ErrNotFound
@@ -1617,9 +1511,7 @@ func (ms *MongoStorage) RemoveVersions(vrs Versions) (err error) {
return err
})
}
ctxSession, ctxSessionCancel := context.WithTimeout(ms.ctx, ms.ctxTTL)
defer ctxSessionCancel()
return ms.client.UseSession(ctxSession, func(sctx mongo.SessionContext) (err error) {
return ms.query(func(sctx mongo.SessionContext) (err error) {
for k := range vrs {
if _, err = ms.getCol(colVer).UpdateOne(sctx, bson.D{}, bson.M{"$unset": bson.M{k: 1}},
options.Update().SetUpsert(true)); err != nil {