diff --git a/engine/storage_mongo_datadb.go b/engine/storage_mongo_datadb.go index cf2da1f0e..776139331 100644 --- a/engine/storage_mongo_datadb.go +++ b/engine/storage_mongo_datadb.go @@ -25,6 +25,7 @@ import ( "fmt" "io/ioutil" "strings" + "time" "github.com/cgrates/cgrates/cache2go" "github.com/cgrates/cgrates/utils" @@ -86,21 +87,35 @@ var ( type MongoStorage struct { session *mgo.Session - db *mgo.Database + db string ms Marshaler } +func (ms *MongoStorage) conn(col string) (*mgo.Session, *mgo.Collection) { + sessionCopy := ms.session.Copy() + return sessionCopy, sessionCopy.DB(ms.db).C(col) +} + func NewMongoStorage(host, port, db, user, pass string, cdrsIndexes []string) (*MongoStorage, error) { + + // We need this object to establish a session to our MongoDB. address := fmt.Sprintf("%s:%s", host, port) - if user != "" && pass != "" { - address = fmt.Sprintf("%s:%s@%s", user, pass, address) + mongoDBDialInfo := &mgo.DialInfo{ + Addrs: []string{address}, + Timeout: 60 * time.Second, + Database: db, + Username: user, + Password: pass, } - session, err := mgo.Dial(address) + + // Create a session which maintains a pool of socket connections + // to our MongoDB. + session, err := mgo.DialWithInfo(mongoDBDialInfo) if err != nil { return nil, err } ndb := session.DB(db) - //session.SetMode(mgo.Monotonic, true) + session.SetMode(mgo.Monotonic, true) index := mgo.Index{ Key: []string{"key"}, Unique: true, // Prevent two documents from having the same index key @@ -260,7 +275,7 @@ func NewMongoStorage(host, port, db, user, pass string, cdrsIndexes []string) (* if err = ndb.C(utils.TBLSMCosts).EnsureIndex(index); err != nil { return nil, err } - return &MongoStorage{db: ndb, session: session, ms: NewCodecMsgpackMarshaler()}, err + return &MongoStorage{db: db, session: session, ms: NewCodecMsgpackMarshaler()}, err } func (ms *MongoStorage) Close() { @@ -278,47 +293,50 @@ func (ms *MongoStorage) GetKeysForPrefix(prefix string, skipCache bool) ([]strin } var result []string if skipCache { + session := ms.session.Copy() + defer session.Close() + db := session.DB(ms.db) keyResult := struct{ Key string }{} idResult := struct{ Id string }{} switch category { case utils.DESTINATION_PREFIX: - iter := ms.db.C(colDst).Find(bson.M{"key": bson.M{"$regex": bson.RegEx{Pattern: subject}}}).Select(bson.M{"key": 1}).Iter() + iter := db.C(colDst).Find(bson.M{"key": bson.M{"$regex": bson.RegEx{Pattern: subject}}}).Select(bson.M{"key": 1}).Iter() for iter.Next(&keyResult) { result = append(result, utils.DESTINATION_PREFIX+keyResult.Key) } return result, nil case utils.RATING_PLAN_PREFIX: - iter := ms.db.C(colRpl).Find(bson.M{"key": bson.M{"$regex": bson.RegEx{Pattern: subject}}}).Select(bson.M{"key": 1}).Iter() + iter := db.C(colRpl).Find(bson.M{"key": bson.M{"$regex": bson.RegEx{Pattern: subject}}}).Select(bson.M{"key": 1}).Iter() for iter.Next(&keyResult) { result = append(result, utils.RATING_PLAN_PREFIX+keyResult.Key) } return result, nil case utils.RATING_PROFILE_PREFIX: - iter := ms.db.C(colRpf).Find(bson.M{"id": bson.M{"$regex": bson.RegEx{Pattern: subject}}}).Select(bson.M{"id": 1}).Iter() + iter := db.C(colRpf).Find(bson.M{"id": bson.M{"$regex": bson.RegEx{Pattern: subject}}}).Select(bson.M{"id": 1}).Iter() for iter.Next(&idResult) { result = append(result, utils.RATING_PROFILE_PREFIX+idResult.Id) } return result, nil case utils.ACTION_PREFIX: - iter := ms.db.C(colAct).Find(bson.M{"key": bson.M{"$regex": bson.RegEx{Pattern: subject}}}).Select(bson.M{"key": 1}).Iter() + iter := db.C(colAct).Find(bson.M{"key": bson.M{"$regex": bson.RegEx{Pattern: subject}}}).Select(bson.M{"key": 1}).Iter() for iter.Next(&keyResult) { result = append(result, utils.ACTION_PREFIX+keyResult.Key) } return result, nil case utils.ACTION_PLAN_PREFIX: - iter := ms.db.C(colApl).Find(bson.M{"key": bson.M{"$regex": bson.RegEx{Pattern: subject}}}).Select(bson.M{"key": 1}).Iter() + iter := db.C(colApl).Find(bson.M{"key": bson.M{"$regex": bson.RegEx{Pattern: subject}}}).Select(bson.M{"key": 1}).Iter() for iter.Next(&keyResult) { result = append(result, utils.ACTION_PLAN_PREFIX+keyResult.Key) } return result, nil case utils.ACTION_TRIGGER_PREFIX: - iter := ms.db.C(colAtr).Find(bson.M{"key": bson.M{"$regex": bson.RegEx{Pattern: subject}}}).Select(bson.M{"key": 1}).Iter() + iter := db.C(colAtr).Find(bson.M{"key": bson.M{"$regex": bson.RegEx{Pattern: subject}}}).Select(bson.M{"key": 1}).Iter() for iter.Next(&keyResult) { result = append(result, utils.ACTION_TRIGGER_PREFIX+keyResult.Key) } return result, nil case utils.ACCOUNT_PREFIX: - iter := ms.db.C(colAcc).Find(bson.M{"id": bson.M{"$regex": bson.RegEx{Pattern: subject}}}).Select(bson.M{"id": 1}).Iter() + iter := db.C(colAcc).Find(bson.M{"id": bson.M{"$regex": bson.RegEx{Pattern: subject}}}).Select(bson.M{"id": 1}).Iter() for iter.Next(&idResult) { result = append(result, utils.ACCOUNT_PREFIX+idResult.Id) } @@ -330,12 +348,15 @@ func (ms *MongoStorage) GetKeysForPrefix(prefix string, skipCache bool) ([]strin } func (ms *MongoStorage) Flush(ignore string) (err error) { - collections, err := ms.db.CollectionNames() + session := ms.session.Copy() + defer session.Close() + db := session.DB(ms.db) + collections, err := db.CollectionNames() if err != nil { return err } for _, c := range collections { - if err = ms.db.C(c).DropCollection(); err != nil { + if err = db.C(c).DropCollection(); err != nil { return err } } @@ -390,17 +411,20 @@ func (ms *MongoStorage) cacheRating(dKeys, rpKeys, rpfKeys, lcrKeys, dcsKeys, ac cache2go.BeginTransaction() keyResult := struct{ Key string }{} idResult := struct{ Id string }{} + session := ms.session.Copy() + defer session.Close() + db := session.DB(ms.db) if dKeys == nil || (float64(cache2go.CountEntries(utils.DESTINATION_PREFIX))*utils.DESTINATIONS_LOAD_THRESHOLD < float64(len(dKeys))) { // if need to load more than a half of exiting keys load them all utils.Logger.Info("Caching all destinations") - iter := ms.db.C(colDst).Find(nil).Select(bson.M{"key": 1}).Iter() + iter := db.C(colDst).Find(nil).Select(bson.M{"key": 1}).Iter() dKeys = make([]string, 0) for iter.Next(&keyResult) { dKeys = append(dKeys, utils.DESTINATION_PREFIX+keyResult.Key) } if err := iter.Close(); err != nil { cache2go.RollbackTransaction() - return err + return fmt.Errorf("destinations: %s", err.Error()) } cache2go.RemPrefixKey(utils.DESTINATION_PREFIX) } else if len(dKeys) != 0 { @@ -414,7 +438,7 @@ func (ms *MongoStorage) cacheRating(dKeys, rpKeys, rpfKeys, lcrKeys, dcsKeys, ac } if _, err = ms.GetDestination(key[len(utils.DESTINATION_PREFIX):]); err != nil { cache2go.RollbackTransaction() - return err + return fmt.Errorf("destinations: %s", err.Error()) } } if len(dKeys) != 0 { @@ -422,14 +446,14 @@ func (ms *MongoStorage) cacheRating(dKeys, rpKeys, rpfKeys, lcrKeys, dcsKeys, ac } if rpKeys == nil { utils.Logger.Info("Caching all rating plans") - iter := ms.db.C(colRpl).Find(nil).Select(bson.M{"key": 1}).Iter() + iter := db.C(colRpl).Find(nil).Select(bson.M{"key": 1}).Iter() rpKeys = make([]string, 0) for iter.Next(&keyResult) { rpKeys = append(rpKeys, utils.RATING_PLAN_PREFIX+keyResult.Key) } if err := iter.Close(); err != nil { cache2go.RollbackTransaction() - return err + return fmt.Errorf("rating plans: %s", err.Error()) } cache2go.RemPrefixKey(utils.RATING_PLAN_PREFIX) } else if len(rpKeys) != 0 { @@ -439,7 +463,7 @@ func (ms *MongoStorage) cacheRating(dKeys, rpKeys, rpfKeys, lcrKeys, dcsKeys, ac cache2go.RemKey(key) if _, err = ms.GetRatingPlan(key[len(utils.RATING_PLAN_PREFIX):], true); err != nil { cache2go.RollbackTransaction() - return err + return fmt.Errorf("rating plans: %s", err.Error()) } } if len(rpKeys) != 0 { @@ -447,14 +471,14 @@ func (ms *MongoStorage) cacheRating(dKeys, rpKeys, rpfKeys, lcrKeys, dcsKeys, ac } if rpfKeys == nil { utils.Logger.Info("Caching all rating profiles") - iter := ms.db.C(colRpf).Find(nil).Select(bson.M{"id": 1}).Iter() + iter := db.C(colRpf).Find(nil).Select(bson.M{"id": 1}).Iter() rpfKeys = make([]string, 0) for iter.Next(&idResult) { rpfKeys = append(rpfKeys, utils.RATING_PROFILE_PREFIX+idResult.Id) } if err := iter.Close(); err != nil { cache2go.RollbackTransaction() - return err + return fmt.Errorf("rating profiles: %s", err.Error()) } cache2go.RemPrefixKey(utils.RATING_PROFILE_PREFIX) } else if len(rpfKeys) != 0 { @@ -464,7 +488,7 @@ func (ms *MongoStorage) cacheRating(dKeys, rpKeys, rpfKeys, lcrKeys, dcsKeys, ac cache2go.RemKey(key) if _, err = ms.GetRatingProfile(key[len(utils.RATING_PROFILE_PREFIX):], true); err != nil { cache2go.RollbackTransaction() - return err + return fmt.Errorf("rating profiles: %s", err.Error()) } } if len(rpfKeys) != 0 { @@ -472,14 +496,14 @@ func (ms *MongoStorage) cacheRating(dKeys, rpKeys, rpfKeys, lcrKeys, dcsKeys, ac } if lcrKeys == nil { utils.Logger.Info("Caching LCR rules.") - iter := ms.db.C(colLcr).Find(nil).Select(bson.M{"key": 1}).Iter() + iter := db.C(colLcr).Find(nil).Select(bson.M{"key": 1}).Iter() lcrKeys = make([]string, 0) for iter.Next(&keyResult) { lcrKeys = append(lcrKeys, utils.LCR_PREFIX+keyResult.Key) } if err := iter.Close(); err != nil { cache2go.RollbackTransaction() - return err + return fmt.Errorf("lcr rules: %s", err.Error()) } cache2go.RemPrefixKey(utils.LCR_PREFIX) } else if len(lcrKeys) != 0 { @@ -489,7 +513,7 @@ func (ms *MongoStorage) cacheRating(dKeys, rpKeys, rpfKeys, lcrKeys, dcsKeys, ac cache2go.RemKey(key) if _, err = ms.GetLCR(key[len(utils.LCR_PREFIX):], true); err != nil { cache2go.RollbackTransaction() - return err + return fmt.Errorf("lcr rules: %s", err.Error()) } } if len(lcrKeys) != 0 { @@ -498,14 +522,14 @@ func (ms *MongoStorage) cacheRating(dKeys, rpKeys, rpfKeys, lcrKeys, dcsKeys, ac // DerivedChargers caching if dcsKeys == nil { utils.Logger.Info("Caching all derived chargers") - iter := ms.db.C(colDcs).Find(nil).Select(bson.M{"key": 1}).Iter() + iter := db.C(colDcs).Find(nil).Select(bson.M{"key": 1}).Iter() dcsKeys = make([]string, 0) for iter.Next(&keyResult) { dcsKeys = append(dcsKeys, utils.DERIVEDCHARGERS_PREFIX+keyResult.Key) } if err := iter.Close(); err != nil { cache2go.RollbackTransaction() - return err + return fmt.Errorf("derived chargers: %s", err.Error()) } cache2go.RemPrefixKey(utils.DERIVEDCHARGERS_PREFIX) } else if len(dcsKeys) != 0 { @@ -515,7 +539,7 @@ func (ms *MongoStorage) cacheRating(dKeys, rpKeys, rpfKeys, lcrKeys, dcsKeys, ac cache2go.RemKey(key) if _, err = ms.GetDerivedChargers(key[len(utils.DERIVEDCHARGERS_PREFIX):], true); err != nil { cache2go.RollbackTransaction() - return err + return fmt.Errorf("derived chargers: %s", err.Error()) } } if len(dcsKeys) != 0 { @@ -526,14 +550,14 @@ func (ms *MongoStorage) cacheRating(dKeys, rpKeys, rpfKeys, lcrKeys, dcsKeys, ac } if actKeys == nil { utils.Logger.Info("Caching all actions") - iter := ms.db.C(colAct).Find(nil).Select(bson.M{"key": 1}).Iter() + iter := db.C(colAct).Find(nil).Select(bson.M{"key": 1}).Iter() actKeys = make([]string, 0) for iter.Next(&keyResult) { actKeys = append(actKeys, utils.ACTION_PREFIX+keyResult.Key) } if err := iter.Close(); err != nil { cache2go.RollbackTransaction() - return err + return fmt.Errorf("actions: %s", err.Error()) } cache2go.RemPrefixKey(utils.ACTION_PREFIX) } else if len(actKeys) != 0 { @@ -543,7 +567,7 @@ func (ms *MongoStorage) cacheRating(dKeys, rpKeys, rpfKeys, lcrKeys, dcsKeys, ac cache2go.RemKey(key) if _, err = ms.GetActions(key[len(utils.ACTION_PREFIX):], true); err != nil { cache2go.RollbackTransaction() - return err + return fmt.Errorf("actions: %s", err.Error()) } } if len(actKeys) != 0 { @@ -555,14 +579,14 @@ func (ms *MongoStorage) cacheRating(dKeys, rpKeys, rpfKeys, lcrKeys, dcsKeys, ac } if aplKeys == nil { utils.Logger.Info("Caching all action plans") - iter := ms.db.C(colApl).Find(nil).Select(bson.M{"key": 1}).Iter() + iter := db.C(colApl).Find(nil).Select(bson.M{"key": 1}).Iter() aplKeys = make([]string, 0) for iter.Next(&keyResult) { aplKeys = append(aplKeys, utils.ACTION_PLAN_PREFIX+keyResult.Key) } if err := iter.Close(); err != nil { cache2go.RollbackTransaction() - return err + return fmt.Errorf("action plans: %s", err.Error()) } cache2go.RemPrefixKey(utils.ACTION_PLAN_PREFIX) } else if len(aplKeys) != 0 { @@ -572,7 +596,7 @@ func (ms *MongoStorage) cacheRating(dKeys, rpKeys, rpfKeys, lcrKeys, dcsKeys, ac cache2go.RemKey(key) if _, err = ms.GetActionPlan(key[len(utils.ACTION_PLAN_PREFIX):], true); err != nil { cache2go.RollbackTransaction() - return err + return fmt.Errorf("action plans: %s", err.Error()) } } if len(aplKeys) != 0 { @@ -584,14 +608,14 @@ func (ms *MongoStorage) cacheRating(dKeys, rpKeys, rpfKeys, lcrKeys, dcsKeys, ac } if shgKeys == nil { utils.Logger.Info("Caching all shared groups") - iter := ms.db.C(colShg).Find(nil).Select(bson.M{"id": 1}).Iter() + iter := db.C(colShg).Find(nil).Select(bson.M{"id": 1}).Iter() shgKeys = make([]string, 0) for iter.Next(&idResult) { shgKeys = append(shgKeys, utils.SHARED_GROUP_PREFIX+idResult.Id) } if err := iter.Close(); err != nil { cache2go.RollbackTransaction() - return err + return fmt.Errorf("shared groups: %s", err.Error()) } } else if len(shgKeys) != 0 { utils.Logger.Info(fmt.Sprintf("Caching shared groups: %v", shgKeys)) @@ -600,7 +624,7 @@ func (ms *MongoStorage) cacheRating(dKeys, rpKeys, rpfKeys, lcrKeys, dcsKeys, ac cache2go.RemKey(key) if _, err = ms.GetSharedGroup(key[len(utils.SHARED_GROUP_PREFIX):], true); err != nil { cache2go.RollbackTransaction() - return err + return fmt.Errorf("shared groups: %s", err.Error()) } } if len(shgKeys) != 0 { @@ -646,16 +670,19 @@ func (ms *MongoStorage) cacheAccounting(alsKeys []string) (err error) { if alsKeys == nil { cache2go.RemPrefixKey(utils.ALIASES_PREFIX) } + session := ms.session.Copy() + defer session.Close() + db := session.DB(ms.db) if alsKeys == nil { utils.Logger.Info("Caching all aliases") - iter := ms.db.C(colAls).Find(nil).Select(bson.M{"key": 1}).Iter() + iter := db.C(colAls).Find(nil).Select(bson.M{"key": 1}).Iter() alsKeys = make([]string, 0) for iter.Next(&keyResult) { alsKeys = append(alsKeys, utils.ALIASES_PREFIX+keyResult.Key) } if err := iter.Close(); err != nil { cache2go.RollbackTransaction() - return err + return fmt.Errorf("aliases: %s", err.Error()) } } else if len(alsKeys) != 0 { utils.Logger.Info(fmt.Sprintf("Caching aliases: %v", alsKeys)) @@ -671,7 +698,7 @@ func (ms *MongoStorage) cacheAccounting(alsKeys []string) (err error) { cache2go.RemKey(key) if _, err = ms.GetAlias(key[len(utils.ALIASES_PREFIX):], true); err != nil { cache2go.RollbackTransaction() - return err + return fmt.Errorf("aliases: %s", err.Error()) } } if len(alsKeys) != 0 { @@ -688,24 +715,27 @@ func (ms *MongoStorage) cacheAccounting(alsKeys []string) (err error) { } func (ms *MongoStorage) HasData(category, subject string) (bool, error) { + session := ms.session.Copy() + defer session.Close() + db := session.DB(ms.db) switch category { case utils.DESTINATION_PREFIX: - count, err := ms.db.C(colDst).Find(bson.M{"key": subject}).Count() + count, err := db.C(colDst).Find(bson.M{"key": subject}).Count() return count > 0, err case utils.RATING_PLAN_PREFIX: - count, err := ms.db.C(colRpl).Find(bson.M{"key": subject}).Count() + count, err := db.C(colRpl).Find(bson.M{"key": subject}).Count() return count > 0, err case utils.RATING_PROFILE_PREFIX: - count, err := ms.db.C(colRpf).Find(bson.M{"id": subject}).Count() + count, err := db.C(colRpf).Find(bson.M{"id": subject}).Count() return count > 0, err case utils.ACTION_PREFIX: - count, err := ms.db.C(colAct).Find(bson.M{"key": subject}).Count() + count, err := db.C(colAct).Find(bson.M{"key": subject}).Count() return count > 0, err case utils.ACTION_PLAN_PREFIX: - count, err := ms.db.C(colApl).Find(bson.M{"key": subject}).Count() + count, err := db.C(colApl).Find(bson.M{"key": subject}).Count() return count > 0, err case utils.ACCOUNT_PREFIX: - count, err := ms.db.C(colAcc).Find(bson.M{"id": subject}).Count() + count, err := db.C(colAcc).Find(bson.M{"id": subject}).Count() return count > 0, err } return false, errors.New("unsupported category in HasData") @@ -724,7 +754,9 @@ func (ms *MongoStorage) GetRatingPlan(key string, skipCache bool) (rp *RatingPla Key string Value []byte } - err = ms.db.C(colRpl).Find(bson.M{"key": key}).One(&kv) + session, col := ms.conn(colRpl) + defer session.Close() + err = col.Find(bson.M{"key": key}).One(&kv) if err == nil { b := bytes.NewBuffer(kv.Value) r, err := zlib.NewReader(b) @@ -754,7 +786,9 @@ func (ms *MongoStorage) SetRatingPlan(rp *RatingPlan) error { w := zlib.NewWriter(&b) w.Write(result) w.Close() - _, err = ms.db.C(colRpl).Upsert(bson.M{"key": rp.Id}, &struct { + session, col := ms.conn(colRpl) + defer session.Close() + _, err = col.Upsert(bson.M{"key": rp.Id}, &struct { Key string Value []byte }{Key: rp.Id, Value: b.Bytes()}) @@ -774,7 +808,9 @@ func (ms *MongoStorage) GetRatingProfile(key string, skipCache bool) (rp *Rating } } rp = new(RatingProfile) - err = ms.db.C(colRpf).Find(bson.M{"id": key}).One(rp) + session, col := ms.conn(colRpf) + defer session.Close() + err = col.Find(bson.M{"id": key}).One(rp) if err == nil { cache2go.Cache(utils.RATING_PROFILE_PREFIX+key, rp) } @@ -782,7 +818,9 @@ func (ms *MongoStorage) GetRatingProfile(key string, skipCache bool) (rp *Rating } func (ms *MongoStorage) SetRatingProfile(rp *RatingProfile) error { - _, err := ms.db.C(colRpf).Upsert(bson.M{"id": rp.Id}, rp) + session, col := ms.conn(colRpf) + defer session.Close() + _, err := col.Upsert(bson.M{"id": rp.Id}, rp) if err == nil && historyScribe != nil { var response int historyScribe.Call("HistoryV1.Record", rp.GetHistoryRecord(false), &response) @@ -791,10 +829,12 @@ func (ms *MongoStorage) SetRatingProfile(rp *RatingProfile) error { } func (ms *MongoStorage) RemoveRatingProfile(key string) error { - iter := ms.db.C(colRpf).Find(bson.M{"id": bson.RegEx{Pattern: key + ".*", Options: ""}}).Select(bson.M{"id": 1}).Iter() + session, col := ms.conn(colRpf) + defer session.Close() + iter := col.Find(bson.M{"id": bson.RegEx{Pattern: key + ".*", Options: ""}}).Select(bson.M{"id": 1}).Iter() var result struct{ Id string } for iter.Next(&result) { - if err := ms.db.C(colRpf).Remove(bson.M{"id": result.Id}); err != nil { + if err := col.Remove(bson.M{"id": result.Id}); err != nil { return err } cache2go.RemKey(utils.RATING_PROFILE_PREFIX + key) @@ -819,7 +859,9 @@ func (ms *MongoStorage) GetLCR(key string, skipCache bool) (lcr *LCR, err error) Key string Value *LCR } - err = ms.db.C(colLcr).Find(bson.M{"key": key}).One(&result) + session, col := ms.conn(colLcr) + defer session.Close() + err = col.Find(bson.M{"key": key}).One(&result) if err == nil { lcr = result.Value cache2go.Cache(utils.LCR_PREFIX+key, lcr) @@ -828,7 +870,9 @@ func (ms *MongoStorage) GetLCR(key string, skipCache bool) (lcr *LCR, err error) } func (ms *MongoStorage) SetLCR(lcr *LCR) error { - _, err := ms.db.C(colLcr).Upsert(bson.M{"key": lcr.GetId()}, &struct { + session, col := ms.conn(colLcr) + defer session.Close() + _, err := col.Upsert(bson.M{"key": lcr.GetId()}, &struct { Key string Value *LCR }{lcr.GetId(), lcr}) @@ -841,7 +885,9 @@ func (ms *MongoStorage) GetDestination(key string) (result *Destination, err err Key string Value []byte } - err = ms.db.C(colDst).Find(bson.M{"key": key}).One(&kv) + session, col := ms.conn(colDst) + defer session.Close() + err = col.Find(bson.M{"key": key}).One(&kv) if err == nil { b := bytes.NewBuffer(kv.Value) r, err := zlib.NewReader(b) @@ -877,7 +923,9 @@ func (ms *MongoStorage) SetDestination(dest *Destination) (err error) { w := zlib.NewWriter(&b) w.Write(result) w.Close() - _, err = ms.db.C(colDst).Upsert(bson.M{"key": dest.Id}, &struct { + session, col := ms.conn(colDst) + defer session.Close() + _, err = col.Upsert(bson.M{"key": dest.Id}, &struct { Key string Value []byte }{Key: dest.Id, Value: b.Bytes()}) @@ -904,7 +952,9 @@ func (ms *MongoStorage) GetActions(key string, skipCache bool) (as Actions, err Key string Value Actions } - err = ms.db.C(colAct).Find(bson.M{"key": key}).One(&result) + session, col := ms.conn(colAct) + defer session.Close() + err = col.Find(bson.M{"key": key}).One(&result) if err == nil { as = result.Value cache2go.Cache(utils.ACTION_PREFIX+key, as) @@ -913,7 +963,9 @@ func (ms *MongoStorage) GetActions(key string, skipCache bool) (as Actions, err } func (ms *MongoStorage) SetActions(key string, as Actions) error { - _, err := ms.db.C(colAct).Upsert(bson.M{"key": key}, &struct { + session, col := ms.conn(colAct) + defer session.Close() + _, err := col.Upsert(bson.M{"key": key}, &struct { Key string Value Actions }{Key: key, Value: as}) @@ -921,7 +973,9 @@ func (ms *MongoStorage) SetActions(key string, as Actions) error { } func (ms *MongoStorage) RemoveActions(key string) error { - return ms.db.C(colAct).Remove(bson.M{"key": key}) + session, col := ms.conn(colAct) + defer session.Close() + return col.Remove(bson.M{"key": key}) } func (ms *MongoStorage) GetSharedGroup(key string, skipCache bool) (sg *SharedGroup, err error) { @@ -932,8 +986,10 @@ func (ms *MongoStorage) GetSharedGroup(key string, skipCache bool) (sg *SharedGr return nil, err } } + session, col := ms.conn(colShg) + defer session.Close() sg = &SharedGroup{} - err = ms.db.C(colShg).Find(bson.M{"id": key}).One(sg) + err = col.Find(bson.M{"id": key}).One(sg) if err == nil { cache2go.Cache(utils.SHARED_GROUP_PREFIX+key, sg) } @@ -941,13 +997,17 @@ func (ms *MongoStorage) GetSharedGroup(key string, skipCache bool) (sg *SharedGr } func (ms *MongoStorage) SetSharedGroup(sg *SharedGroup) (err error) { - _, err = ms.db.C(colShg).Upsert(bson.M{"id": sg.Id}, sg) + session, col := ms.conn(colShg) + defer session.Close() + _, err = col.Upsert(bson.M{"id": sg.Id}, sg) return err } func (ms *MongoStorage) GetAccount(key string) (result *Account, err error) { result = new(Account) - err = ms.db.C(colAcc).Find(bson.M{"id": key}).One(result) + session, col := ms.conn(colAcc) + defer session.Close() + err = col.Find(bson.M{"id": key}).One(result) if err == mgo.ErrNotFound { result = nil } @@ -967,12 +1027,16 @@ func (ms *MongoStorage) SetAccount(acc *Account) error { acc = ac } } - _, err := ms.db.C(colAcc).Upsert(bson.M{"id": acc.ID}, acc) + session, col := ms.conn(colAcc) + defer session.Close() + _, err := col.Upsert(bson.M{"id": acc.ID}, acc) return err } func (ms *MongoStorage) RemoveAccount(key string) error { - return ms.db.C(colAcc).Remove(bson.M{"id": key}) + session, col := ms.conn(colAcc) + defer session.Close() + return col.Remove(bson.M{"id": key}) } @@ -981,7 +1045,9 @@ func (ms *MongoStorage) GetCdrStatsQueue(key string) (sq *StatsQueue, err error) Key string Value *StatsQueue } - err = ms.db.C(colStq).Find(bson.M{"key": key}).One(&result) + session, col := ms.conn(colStq) + defer session.Close() + err = col.Find(bson.M{"key": key}).One(&result) if err == nil { sq = result.Value } @@ -989,7 +1055,9 @@ func (ms *MongoStorage) GetCdrStatsQueue(key string) (sq *StatsQueue, err error) } func (ms *MongoStorage) SetCdrStatsQueue(sq *StatsQueue) (err error) { - _, err = ms.db.C(colStq).Upsert(bson.M{"key": sq.GetId()}, &struct { + session, col := ms.conn(colStq) + defer session.Close() + _, err = col.Upsert(bson.M{"key": sq.GetId()}, &struct { Key string Value *StatsQueue }{Key: sq.GetId(), Value: sq}) @@ -997,7 +1065,9 @@ func (ms *MongoStorage) SetCdrStatsQueue(sq *StatsQueue) (err error) { } func (ms *MongoStorage) GetSubscribers() (result map[string]*SubscriberData, err error) { - iter := ms.db.C(colPbs).Find(nil).Iter() + session, col := ms.conn(colPbs) + defer session.Close() + iter := col.Find(nil).Iter() result = make(map[string]*SubscriberData) var kv struct { Key string @@ -1011,7 +1081,9 @@ func (ms *MongoStorage) GetSubscribers() (result map[string]*SubscriberData, err } func (ms *MongoStorage) SetSubscriber(key string, sub *SubscriberData) (err error) { - _, err = ms.db.C(colPbs).Upsert(bson.M{"key": key}, &struct { + session, col := ms.conn(colPbs) + defer session.Close() + _, err = col.Upsert(bson.M{"key": key}, &struct { Key string Value *SubscriberData }{Key: key, Value: sub}) @@ -1019,11 +1091,15 @@ func (ms *MongoStorage) SetSubscriber(key string, sub *SubscriberData) (err erro } func (ms *MongoStorage) RemoveSubscriber(key string) (err error) { - return ms.db.C(colPbs).Remove(bson.M{"key": key}) + session, col := ms.conn(colPbs) + defer session.Close() + return col.Remove(bson.M{"key": key}) } func (ms *MongoStorage) SetUser(up *UserProfile) (err error) { - _, err = ms.db.C(colUsr).Upsert(bson.M{"key": up.GetId()}, &struct { + session, col := ms.conn(colUsr) + defer session.Close() + _, err = col.Upsert(bson.M{"key": up.GetId()}, &struct { Key string Value *UserProfile }{Key: up.GetId(), Value: up}) @@ -1035,7 +1111,9 @@ func (ms *MongoStorage) GetUser(key string) (up *UserProfile, err error) { Key string Value *UserProfile } - err = ms.db.C(colUsr).Find(bson.M{"key": key}).One(&kv) + session, col := ms.conn(colUsr) + defer session.Close() + err = col.Find(bson.M{"key": key}).One(&kv) if err == nil { up = kv.Value } @@ -1043,7 +1121,9 @@ func (ms *MongoStorage) GetUser(key string) (up *UserProfile, err error) { } func (ms *MongoStorage) GetUsers() (result []*UserProfile, err error) { - iter := ms.db.C(colUsr).Find(nil).Iter() + session, col := ms.conn(colUsr) + defer session.Close() + iter := col.Find(nil).Iter() var kv struct { Key string Value *UserProfile @@ -1056,11 +1136,15 @@ func (ms *MongoStorage) GetUsers() (result []*UserProfile, err error) { } func (ms *MongoStorage) RemoveUser(key string) (err error) { - return ms.db.C(colUsr).Remove(bson.M{"key": key}) + session, col := ms.conn(colUsr) + defer session.Close() + return col.Remove(bson.M{"key": key}) } func (ms *MongoStorage) SetAlias(al *Alias) (err error) { - _, err = ms.db.C(colAls).Upsert(bson.M{"key": al.GetId()}, &struct { + session, col := ms.conn(colAls) + defer session.Close() + _, err = col.Upsert(bson.M{"key": al.GetId()}, &struct { Key string Value AliasValues }{Key: al.GetId(), Value: al.Values}) @@ -1083,7 +1167,9 @@ func (ms *MongoStorage) GetAlias(key string, skipCache bool) (al *Alias, err err Key string Value AliasValues } - if err = ms.db.C(colAls).Find(bson.M{"key": origKey}).One(&kv); err == nil { + session, col := ms.conn(colAls) + defer session.Close() + if err = col.Find(bson.M{"key": origKey}).One(&kv); err == nil { al = &Alias{Values: kv.Value} al.SetId(origKey) if err == nil { @@ -1104,10 +1190,12 @@ func (ms *MongoStorage) RemoveAlias(key string) (err error) { Key string Value AliasValues } - if err := ms.db.C(colAls).Find(bson.M{"key": origKey}).One(&kv); err == nil { + session, col := ms.conn(colAls) + defer session.Close() + if err := col.Find(bson.M{"key": origKey}).One(&kv); err == nil { al.Values = kv.Value } - err = ms.db.C(colAls).Remove(bson.M{"key": origKey}) + err = col.Remove(bson.M{"key": origKey}) if err == nil { al.RemoveReverseCache() cache2go.RemKey(key) @@ -1135,7 +1223,9 @@ func (ms *MongoStorage) GetLoadHistory(limit int, skipCache bool) (loadInsts []* Key string Value []*LoadInstance } - err = ms.db.C(colLht).Find(bson.M{"key": utils.LOADINST_KEY}).One(&kv) + session, col := ms.conn(colLht) + defer session.Close() + err = col.Find(bson.M{"key": utils.LOADINST_KEY}).One(&kv) if err == nil { loadInsts = kv.Value cache2go.RemKey(utils.LOADINST_KEY) @@ -1155,7 +1245,9 @@ func (ms *MongoStorage) AddLoadHistory(ldInst *LoadInstance, loadHistSize int) e Key string Value []*LoadInstance } - err := ms.db.C(colLht).Find(bson.M{"key": utils.LOADINST_KEY}).One(&kv) + session, col := ms.conn(colLht) + defer session.Close() + err := col.Find(bson.M{"key": utils.LOADINST_KEY}).One(&kv) if err != nil && err != mgo.ErrNotFound { return err @@ -1176,7 +1268,9 @@ func (ms *MongoStorage) AddLoadHistory(ldInst *LoadInstance, loadHistSize int) e if histLen >= loadHistSize { // Have hit maximum history allowed, remove oldest element in order to add new one existingLoadHistory = existingLoadHistory[:loadHistSize] } - _, err = ms.db.C(colLht).Upsert(bson.M{"key": utils.LOADINST_KEY}, &struct { + session, col := ms.conn(colLht) + defer session.Close() + _, err = col.Upsert(bson.M{"key": utils.LOADINST_KEY}, &struct { Key string Value []*LoadInstance }{Key: utils.LOADINST_KEY, Value: existingLoadHistory}) @@ -1190,7 +1284,9 @@ func (ms *MongoStorage) GetActionTriggers(key string) (atrs ActionTriggers, err Key string Value ActionTriggers } - err = ms.db.C(colAtr).Find(bson.M{"key": key}).One(&kv) + session, col := ms.conn(colAtr) + defer session.Close() + err = col.Find(bson.M{"key": key}).One(&kv) if err == nil { atrs = kv.Value } @@ -1198,14 +1294,16 @@ func (ms *MongoStorage) GetActionTriggers(key string) (atrs ActionTriggers, err } func (ms *MongoStorage) SetActionTriggers(key string, atrs ActionTriggers) (err error) { + session, col := ms.conn(colAtr) + defer session.Close() if len(atrs) == 0 { - err = ms.db.C(colAtr).Remove(bson.M{"key": key}) // delete the key + err = col.Remove(bson.M{"key": key}) // delete the key if err != mgo.ErrNotFound { return err } return nil } - _, err = ms.db.C(colAtr).Upsert(bson.M{"key": key}, &struct { + _, err = col.Upsert(bson.M{"key": key}, &struct { Key string Value ActionTriggers }{Key: key, Value: atrs}) @@ -1224,7 +1322,9 @@ func (ms *MongoStorage) GetActionPlan(key string, skipCache bool) (ats *ActionPl Key string Value []byte } - err = ms.db.C(colApl).Find(bson.M{"key": key}).One(&kv) + session, col := ms.conn(colApl) + defer session.Close() + err = col.Find(bson.M{"key": key}).One(&kv) if err == nil { b := bytes.NewBuffer(kv.Value) r, err := zlib.NewReader(b) @@ -1246,10 +1346,12 @@ func (ms *MongoStorage) GetActionPlan(key string, skipCache bool) (ats *ActionPl } func (ms *MongoStorage) SetActionPlan(key string, ats *ActionPlan, overwrite bool) error { + session, col := ms.conn(colApl) + defer session.Close() // clean dots from account ids map if len(ats.ActionTimings) == 0 { cache2go.RemKey(utils.ACTION_PLAN_PREFIX + key) - err := ms.db.C(colApl).Remove(bson.M{"key": key}) + err := col.Remove(bson.M{"key": key}) if err != mgo.ErrNotFound { return err } @@ -1274,7 +1376,7 @@ func (ms *MongoStorage) SetActionPlan(key string, ats *ActionPlan, overwrite boo w := zlib.NewWriter(&b) w.Write(result) w.Close() - _, err = ms.db.C(colApl).Upsert(bson.M{"key": key}, &struct { + _, err = col.Upsert(bson.M{"key": key}, &struct { Key string Value []byte }{Key: key, Value: b.Bytes()}) @@ -1297,7 +1399,9 @@ func (ms *MongoStorage) GetAllActionPlans() (ats map[string]*ActionPlan, err err } func (ms *MongoStorage) PushTask(t *Task) error { - return ms.db.C(colTsk).Insert(bson.M{"_id": bson.NewObjectId(), "task": t}) + session, col := ms.conn(colTsk) + defer session.Close() + return col.Insert(bson.M{"_id": bson.NewObjectId(), "task": t}) } func (ms *MongoStorage) PopTask() (t *Task, err error) { @@ -1305,8 +1409,10 @@ func (ms *MongoStorage) PopTask() (t *Task, err error) { ID bson.ObjectId `bson:"_id"` Task *Task }{} - if err = ms.db.C(colTsk).Find(nil).One(&v); err == nil { - if remErr := ms.db.C(colTsk).Remove(bson.M{"_id": v.ID}); remErr != nil { + session, col := ms.conn(colTsk) + defer session.Close() + if err = col.Find(nil).One(&v); err == nil { + if remErr := col.Remove(bson.M{"_id": v.ID}); remErr != nil { return nil, remErr } t = v.Task @@ -1327,7 +1433,9 @@ func (ms *MongoStorage) GetDerivedChargers(key string, skipCache bool) (dcs *uti Key string Value *utils.DerivedChargers } - err = ms.db.C(colDcs).Find(bson.M{"key": key}).One(&kv) + session, col := ms.conn(colDcs) + defer session.Close() + err = col.Find(bson.M{"key": key}).One(&kv) if err == nil { dcs = kv.Value cache2go.Cache(utils.DERIVEDCHARGERS_PREFIX+key, dcs) @@ -1338,13 +1446,17 @@ func (ms *MongoStorage) GetDerivedChargers(key string, skipCache bool) (dcs *uti func (ms *MongoStorage) SetDerivedChargers(key string, dcs *utils.DerivedChargers) (err error) { if dcs == nil || len(dcs.Chargers) == 0 { cache2go.RemKey(utils.DERIVEDCHARGERS_PREFIX + key) - err = ms.db.C(colDcs).Remove(bson.M{"key": key}) + session, col := ms.conn(colDcs) + defer session.Close() + err = col.Remove(bson.M{"key": key}) if err != mgo.ErrNotFound { return err } return nil } - _, err = ms.db.C(colDcs).Upsert(bson.M{"key": key}, &struct { + session, col := ms.conn(colDcs) + defer session.Close() + _, err = col.Upsert(bson.M{"key": key}, &struct { Key string Value *utils.DerivedChargers }{Key: key, Value: dcs}) @@ -1352,18 +1464,24 @@ func (ms *MongoStorage) SetDerivedChargers(key string, dcs *utils.DerivedCharger } func (ms *MongoStorage) SetCdrStats(cs *CdrStats) error { - _, err := ms.db.C(colCrs).Upsert(bson.M{"id": cs.Id}, cs) + session, col := ms.conn(colCrs) + defer session.Close() + _, err := col.Upsert(bson.M{"id": cs.Id}, cs) return err } func (ms *MongoStorage) GetCdrStats(key string) (cs *CdrStats, err error) { cs = &CdrStats{} - err = ms.db.C(colCrs).Find(bson.M{"id": key}).One(cs) + session, col := ms.conn(colCrs) + defer session.Close() + err = col.Find(bson.M{"id": key}).One(cs) return } func (ms *MongoStorage) GetAllCdrStats() (css []*CdrStats, err error) { - iter := ms.db.C(colCrs).Find(nil).Iter() + session, col := ms.conn(colCrs) + defer session.Close() + iter := col.Find(nil).Iter() var cs CdrStats for iter.Next(&cs) { clone := cs // avoid using the same pointer in append @@ -1374,7 +1492,9 @@ func (ms *MongoStorage) GetAllCdrStats() (css []*CdrStats, err error) { } func (ms *MongoStorage) SetStructVersion(v *StructVersion) (err error) { - _, err = ms.db.C(colVer).Upsert(bson.M{"key": utils.VERSION_PREFIX + "struct"}, &struct { + session, col := ms.conn(colVer) + defer session.Close() + _, err = col.Upsert(bson.M{"key": utils.VERSION_PREFIX + "struct"}, &struct { Key string Value *StructVersion }{utils.VERSION_PREFIX + "struct", v}) @@ -1386,8 +1506,9 @@ func (ms *MongoStorage) GetStructVersion() (rsv *StructVersion, err error) { Key string Value StructVersion } - - err = ms.db.C(colVer).Find(bson.M{"key": utils.VERSION_PREFIX + "struct"}).One(&result) + session, col := ms.conn(colVer) + defer session.Close() + err = col.Find(bson.M{"key": utils.VERSION_PREFIX + "struct"}).One(&result) if err == mgo.ErrNotFound { rsv = nil } diff --git a/engine/storage_mongo_stordb.go b/engine/storage_mongo_stordb.go index 4edd9ae8e..b07c8ba65 100644 --- a/engine/storage_mongo_stordb.go +++ b/engine/storage_mongo_stordb.go @@ -12,14 +12,17 @@ import ( func (ms *MongoStorage) GetTpIds() ([]string, error) { tpidMap := make(map[string]bool) - cols, err := ms.db.CollectionNames() + session := ms.session.Copy() + db := session.DB(ms.db) + defer session.Close() + cols, err := db.CollectionNames() if err != nil { return nil, err } for _, col := range cols { if strings.HasPrefix(col, "tp_") { tpids := make([]string, 0) - if err := ms.db.C(col).Find(nil).Select(bson.M{"tpid": 1}).Distinct("tpid", &tpids); err != nil { + if err := db.C(col).Find(nil).Select(bson.M{"tpid": 1}).Distinct("tpid", &tpids); err != nil { return nil, err } for _, tpid := range tpids { @@ -55,7 +58,9 @@ func (ms *MongoStorage) GetTpTableIds(tpid, table string, distinct utils.TPDisti } findMap["$and"] = []bson.M{bson.M{"$or": searchItems}} } - q := ms.db.C(table).Find(findMap) + session, col := ms.conn(table) + defer session.Close() + q := col.Find(findMap) if pag != nil { if pag.Limit != nil { q = q.Limit(*pag.Limit) @@ -99,7 +104,9 @@ func (ms *MongoStorage) GetTpTimings(tpid, tag string) ([]TpTiming, error) { filter["tag"] = tag } var results []TpTiming - err := ms.db.C(utils.TBL_TP_TIMINGS).Find(filter).All(&results) + session, col := ms.conn(utils.TBL_TP_TIMINGS) + defer session.Close() + err := col.Find(filter).All(&results) return results, err } @@ -111,7 +118,9 @@ func (ms *MongoStorage) GetTpDestinations(tpid, tag string) ([]TpDestination, er filter["tag"] = tag } var results []TpDestination - err := ms.db.C(utils.TBL_TP_DESTINATIONS).Find(filter).All(&results) + session, col := ms.conn(utils.TBL_TP_DESTINATIONS) + defer session.Close() + err := col.Find(filter).All(&results) return results, err } @@ -123,7 +132,9 @@ func (ms *MongoStorage) GetTpRates(tpid, tag string) ([]TpRate, error) { filter["tag"] = tag } var results []TpRate - err := ms.db.C(utils.TBL_TP_RATES).Find(filter).All(&results) + session, col := ms.conn(utils.TBL_TP_RATES) + defer session.Close() + err := col.Find(filter).All(&results) return results, err } @@ -135,7 +146,9 @@ func (ms *MongoStorage) GetTpDestinationRates(tpid, tag string, pag *utils.Pagin filter["tag"] = tag } var results []TpDestinationRate - q := ms.db.C(utils.TBL_TP_DESTINATION_RATES).Find(filter) + session, col := ms.conn(utils.TBL_TP_DESTINATION_RATES) + defer session.Close() + q := col.Find(filter) if pag != nil { if pag.Limit != nil { q = q.Limit(*pag.Limit) @@ -156,7 +169,9 @@ func (ms *MongoStorage) GetTpRatingPlans(tpid, tag string, pag *utils.Paginator) filter["tag"] = tag } var results []TpRatingPlan - q := ms.db.C(utils.TBL_TP_RATING_PLANS).Find(filter) + session, col := ms.conn(utils.TBL_TP_RATING_PLANS) + defer session.Close() + q := col.Find(filter) if pag != nil { if pag.Limit != nil { q = q.Limit(*pag.Limit) @@ -187,7 +202,9 @@ func (ms *MongoStorage) GetTpRatingProfiles(tp *TpRatingProfile) ([]TpRatingProf filter["loadid"] = tp.Loadid } var results []TpRatingProfile - err := ms.db.C(utils.TBL_TP_RATE_PROFILES).Find(filter).All(&results) + session, col := ms.conn(utils.TBL_TP_RATE_PROFILES) + defer session.Close() + err := col.Find(filter).All(&results) return results, err } @@ -199,7 +216,9 @@ func (ms *MongoStorage) GetTpSharedGroups(tpid, tag string) ([]TpSharedGroup, er filter["tag"] = tag } var results []TpSharedGroup - err := ms.db.C(utils.TBL_TP_SHARED_GROUPS).Find(filter).All(&results) + session, col := ms.conn(utils.TBL_TP_SHARED_GROUPS) + defer session.Close() + err := col.Find(filter).All(&results) return results, err } @@ -211,7 +230,9 @@ func (ms *MongoStorage) GetTpCdrStats(tpid, tag string) ([]TpCdrstat, error) { filter["tag"] = tag } var results []TpCdrstat - err := ms.db.C(utils.TBL_TP_CDR_STATS).Find(filter).All(&results) + session, col := ms.conn(utils.TBL_TP_CDR_STATS) + defer session.Close() + err := col.Find(filter).All(&results) return results, err } func (ms *MongoStorage) GetTpLCRs(tp *TpLcrRule) ([]TpLcrRule, error) { @@ -232,7 +253,9 @@ func (ms *MongoStorage) GetTpLCRs(tp *TpLcrRule) ([]TpLcrRule, error) { filter["subject"] = tp.Subject } var results []TpLcrRule - err := ms.db.C(utils.TBL_TP_LCRS).Find(filter).All(&results) + session, col := ms.conn(utils.TBL_TP_LCRS) + defer session.Close() + err := col.Find(filter).All(&results) return results, err } @@ -245,7 +268,9 @@ func (ms *MongoStorage) GetTpUsers(tp *TpUser) ([]TpUser, error) { filter["username"] = tp.UserName } var results []TpUser - err := ms.db.C(utils.TBL_TP_USERS).Find(filter).All(&results) + session, col := ms.conn(utils.TBL_TP_USERS) + defer session.Close() + err := col.Find(filter).All(&results) return results, err } @@ -270,7 +295,9 @@ func (ms *MongoStorage) GetTpAliases(tp *TpAlias) ([]TpAlias, error) { filter["context"] = tp.Context } var results []TpAlias - err := ms.db.C(utils.TBL_TP_ALIASES).Find(filter).All(&results) + session, col := ms.conn(utils.TBL_TP_ALIASES) + defer session.Close() + err := col.Find(filter).All(&results) return results, err } @@ -295,7 +322,9 @@ func (ms *MongoStorage) GetTpDerivedChargers(tp *TpDerivedCharger) ([]TpDerivedC filter["loadid"] = tp.Loadid } var results []TpDerivedCharger - err := ms.db.C(utils.TBL_TP_DERIVED_CHARGERS).Find(filter).All(&results) + session, col := ms.conn(utils.TBL_TP_DERIVED_CHARGERS) + defer session.Close() + err := col.Find(filter).All(&results) return results, err } @@ -307,7 +336,9 @@ func (ms *MongoStorage) GetTpActions(tpid, tag string) ([]TpAction, error) { filter["tag"] = tag } var results []TpAction - err := ms.db.C(utils.TBL_TP_ACTIONS).Find(filter).All(&results) + session, col := ms.conn(utils.TBL_TP_ACTIONS) + defer session.Close() + err := col.Find(filter).All(&results) return results, err } @@ -319,7 +350,9 @@ func (ms *MongoStorage) GetTpActionPlans(tpid, tag string) ([]TpActionPlan, erro filter["tag"] = tag } var results []TpActionPlan - err := ms.db.C(utils.TBL_TP_ACTION_PLANS).Find(filter).All(&results) + session, col := ms.conn(utils.TBL_TP_ACTION_PLANS) + defer session.Close() + err := col.Find(filter).All(&results) return results, err } @@ -331,7 +364,9 @@ func (ms *MongoStorage) GetTpActionTriggers(tpid, tag string) ([]TpActionTrigger filter["tag"] = tag } var results []TpActionTrigger - err := ms.db.C(utils.TBL_TP_ACTION_TRIGGERS).Find(filter).All(&results) + session, col := ms.conn(utils.TBL_TP_ACTION_TRIGGERS) + defer session.Close() + err := col.Find(filter).All(&results) return results, err } @@ -347,19 +382,24 @@ func (ms *MongoStorage) GetTpAccountActions(tp *TpAccountAction) ([]TpAccountAct filter["loadid"] = tp.Loadid } var results []TpAccountAction - err := ms.db.C(utils.TBL_TP_ACCOUNT_ACTIONS).Find(filter).All(&results) + session, col := ms.conn(utils.TBL_TP_ACCOUNT_ACTIONS) + defer session.Close() + err := col.Find(filter).All(&results) return results, err } func (ms *MongoStorage) RemTpData(table, tpid string, args map[string]string) error { + session := ms.session.Copy() + db := session.DB(ms.db) + defer session.Close() if len(table) == 0 { // Remove tpid out of all tables - cols, err := ms.db.CollectionNames() + cols, err := db.CollectionNames() if err != nil { return err } for _, col := range cols { if strings.HasPrefix(col, "tp_") { - if _, err := ms.db.C(col).RemoveAll(bson.M{"tpid": tpid}); err != nil { + if _, err := db.C(col).RemoveAll(bson.M{"tpid": tpid}); err != nil { return err } } @@ -371,7 +411,7 @@ func (ms *MongoStorage) RemTpData(table, tpid string, args map[string]string) er args = make(map[string]string) } args["tpid"] = tpid - return ms.db.C(table).Remove(args) + return db.C(table).Remove(args) } func (ms *MongoStorage) SetTpTimings(tps []TpTiming) error { @@ -379,8 +419,9 @@ func (ms *MongoStorage) SetTpTimings(tps []TpTiming) error { return nil } m := make(map[string]bool) - - tx := ms.db.C(utils.TBL_TP_TIMINGS).Bulk() + session, col := ms.conn(utils.TBL_TP_TIMINGS) + defer session.Close() + tx := col.Bulk() for _, tp := range tps { if found, _ := m[tp.Tag]; !found { m[tp.Tag] = true @@ -396,8 +437,9 @@ func (ms *MongoStorage) SetTpDestinations(tps []TpDestination) error { return nil } m := make(map[string]bool) - - tx := ms.db.C(utils.TBL_TP_DESTINATIONS).Bulk() + session, col := ms.conn(utils.TBL_TP_DESTINATIONS) + defer session.Close() + tx := col.Bulk() for _, tp := range tps { if found, _ := m[tp.Tag]; !found { m[tp.Tag] = true @@ -413,8 +455,9 @@ func (ms *MongoStorage) SetTpRates(tps []TpRate) error { return nil } m := make(map[string]bool) - - tx := ms.db.C(utils.TBL_TP_RATES).Bulk() + session, col := ms.conn(utils.TBL_TP_RATES) + defer session.Close() + tx := col.Bulk() for _, tp := range tps { if found, _ := m[tp.Tag]; !found { m[tp.Tag] = true @@ -430,8 +473,9 @@ func (ms *MongoStorage) SetTpDestinationRates(tps []TpDestinationRate) error { return nil } m := make(map[string]bool) - - tx := ms.db.C(utils.TBL_TP_DESTINATION_RATES).Bulk() + session, col := ms.conn(utils.TBL_TP_DESTINATION_RATES) + defer session.Close() + tx := col.Bulk() for _, tp := range tps { if found, _ := m[tp.Tag]; !found { m[tp.Tag] = true @@ -447,8 +491,9 @@ func (ms *MongoStorage) SetTpRatingPlans(tps []TpRatingPlan) error { return nil } m := make(map[string]bool) - - tx := ms.db.C(utils.TBL_TP_RATING_PLANS).Bulk() + session, col := ms.conn(utils.TBL_TP_RATING_PLANS) + defer session.Close() + tx := col.Bulk() for _, tp := range tps { if found, _ := m[tp.Tag]; !found { m[tp.Tag] = true @@ -464,8 +509,9 @@ func (ms *MongoStorage) SetTpRatingProfiles(tps []TpRatingProfile) error { return nil } m := make(map[string]bool) - - tx := ms.db.C(utils.TBL_TP_RATE_PROFILES).Bulk() + session, col := ms.conn(utils.TBL_TP_RATE_PROFILES) + defer session.Close() + tx := col.Bulk() for _, tp := range tps { if found, _ := m[tp.GetRatingProfileId()]; !found { m[tp.GetRatingProfileId()] = true @@ -488,8 +534,9 @@ func (ms *MongoStorage) SetTpSharedGroups(tps []TpSharedGroup) error { return nil } m := make(map[string]bool) - - tx := ms.db.C(utils.TBL_TP_SHARED_GROUPS).Bulk() + session, col := ms.conn(utils.TBL_TP_SHARED_GROUPS) + defer session.Close() + tx := col.Bulk() for _, tp := range tps { if found, _ := m[tp.Tag]; !found { m[tp.Tag] = true @@ -505,8 +552,9 @@ func (ms *MongoStorage) SetTpCdrStats(tps []TpCdrstat) error { return nil } m := make(map[string]bool) - - tx := ms.db.C(utils.TBL_TP_CDR_STATS).Bulk() + session, col := ms.conn(utils.TBL_TP_CDR_STATS) + defer session.Close() + tx := col.Bulk() for _, tp := range tps { if found, _ := m[tp.Tag]; !found { m[tp.Tag] = true @@ -522,8 +570,9 @@ func (ms *MongoStorage) SetTpUsers(tps []TpUser) error { return nil } m := make(map[string]bool) - - tx := ms.db.C(utils.TBL_TP_USERS).Bulk() + session, col := ms.conn(utils.TBL_TP_USERS) + defer session.Close() + tx := col.Bulk() for _, tp := range tps { if found, _ := m[tp.GetId()]; !found { m[tp.GetId()] = true @@ -543,8 +592,9 @@ func (ms *MongoStorage) SetTpAliases(tps []TpAlias) error { return nil } m := make(map[string]bool) - - tx := ms.db.C(utils.TBL_TP_ALIASES).Bulk() + session, col := ms.conn(utils.TBL_TP_ALIASES) + defer session.Close() + tx := col.Bulk() for _, tp := range tps { if found, _ := m[tp.GetId()]; !found { m[tp.GetId()] = true @@ -567,8 +617,9 @@ func (ms *MongoStorage) SetTpDerivedChargers(tps []TpDerivedCharger) error { return nil } m := make(map[string]bool) - - tx := ms.db.C(utils.TBL_TP_DERIVED_CHARGERS).Bulk() + session, col := ms.conn(utils.TBL_TP_DERIVED_CHARGERS) + defer session.Close() + tx := col.Bulk() for _, tp := range tps { if found, _ := m[tp.GetDerivedChargersId()]; !found { m[tp.GetDerivedChargersId()] = true @@ -590,8 +641,9 @@ func (ms *MongoStorage) SetTpLCRs(tps []TpLcrRule) error { return nil } m := make(map[string]bool) - - tx := ms.db.C(utils.TBL_TP_LCRS).Bulk() + session, col := ms.conn(utils.TBL_TP_LCRS) + defer session.Close() + tx := col.Bulk() for _, tp := range tps { if found, _ := m[tp.GetLcrRuleId()]; !found { m[tp.GetLcrRuleId()] = true @@ -613,8 +665,9 @@ func (ms *MongoStorage) SetTpActions(tps []TpAction) error { return nil } m := make(map[string]bool) - - tx := ms.db.C(utils.TBL_TP_ACTIONS).Bulk() + session, col := ms.conn(utils.TBL_TP_ACTIONS) + defer session.Close() + tx := col.Bulk() for _, tp := range tps { if found, _ := m[tp.Tag]; !found { m[tp.Tag] = true @@ -630,8 +683,9 @@ func (ms *MongoStorage) SetTpActionPlans(tps []TpActionPlan) error { return nil } m := make(map[string]bool) - - tx := ms.db.C(utils.TBL_TP_ACTION_PLANS).Bulk() + session, col := ms.conn(utils.TBL_TP_ACTION_PLANS) + defer session.Close() + tx := col.Bulk() for _, tp := range tps { if found, _ := m[tp.Tag]; !found { m[tp.Tag] = true @@ -647,8 +701,9 @@ func (ms *MongoStorage) SetTpActionTriggers(tps []TpActionTrigger) error { return nil } m := make(map[string]bool) - - tx := ms.db.C(utils.TBL_TP_ACTION_TRIGGERS).Bulk() + session, col := ms.conn(utils.TBL_TP_ACTION_TRIGGERS) + defer session.Close() + tx := col.Bulk() for _, tp := range tps { if found, _ := m[tp.Tag]; !found { m[tp.Tag] = true @@ -664,8 +719,9 @@ func (ms *MongoStorage) SetTpAccountActions(tps []TpAccountAction) error { return nil } m := make(map[string]bool) - - tx := ms.db.C(utils.TBL_TP_ACCOUNT_ACTIONS).Bulk() + session, col := ms.conn(utils.TBL_TP_ACCOUNT_ACTIONS) + defer session.Close() + tx := col.Bulk() for _, tp := range tps { if found, _ := m[tp.GetAccountActionId()]; !found { m[tp.GetAccountActionId()] = true @@ -681,7 +737,9 @@ func (ms *MongoStorage) SetTpAccountActions(tps []TpAccountAction) error { } func (ms *MongoStorage) LogActionTrigger(ubId, source string, at *ActionTrigger, as Actions) (err error) { - return ms.db.C(colLogAtr).Insert(&struct { + session, col := ms.conn(colLogAtr) + defer session.Close() + return col.Insert(&struct { ubId string ActionTrigger *ActionTrigger Actions Actions @@ -691,7 +749,9 @@ func (ms *MongoStorage) LogActionTrigger(ubId, source string, at *ActionTrigger, } func (ms *MongoStorage) LogActionTiming(source string, at *ActionTiming, as Actions) (err error) { - return ms.db.C(colLogApl).Insert(&struct { + session, col := ms.conn(colLogApl) + defer session.Close() + return col.Insert(&struct { ActionPlan *ActionTiming Actions Actions LogTime time.Time @@ -700,7 +760,9 @@ func (ms *MongoStorage) LogActionTiming(source string, at *ActionTiming, as Acti } func (ms *MongoStorage) SetSMCost(smc *SMCost) error { - return ms.db.C(utils.TBLSMCosts).Insert(smc) + session, col := ms.conn(utils.TBLSMCosts) + defer session.Close() + return col.Insert(smc) } func (ms *MongoStorage) GetSMCosts(cgrid, runid, originHost, originIDPrefix string) (smcs []*SMCost, err error) { @@ -709,7 +771,9 @@ func (ms *MongoStorage) GetSMCosts(cgrid, runid, originHost, originIDPrefix stri filter = bson.M{OriginIDLow: bson.M{"$regex": bson.RegEx{Pattern: fmt.Sprintf("^%s", originIDPrefix)}}, OriginHostLow: originHost, RunIDLow: runid} } // Execute query - iter := ms.db.C(utils.TBLSMCosts).Find(filter).Iter() + session, col := ms.conn(utils.TBLSMCosts) + defer session.Close() + iter := col.Find(filter).Iter() var smCost SMCost for iter.Next(&smCost) { smcs = append(smcs, &smCost) @@ -724,10 +788,12 @@ func (ms *MongoStorage) SetCDR(cdr *CDR, allowUpdate bool) (err error) { if cdr.OrderID == 0 { cdr.OrderID = time.Now().UnixNano() } + session, col := ms.conn(utils.TBL_CDRS) + defer session.Close() if allowUpdate { - _, err = ms.db.C(utils.TBL_CDRS).Upsert(bson.M{CGRIDLow: cdr.CGRID, RunIDLow: cdr.RunID}, cdr) + _, err = col.Upsert(bson.M{CGRIDLow: cdr.CGRID, RunIDLow: cdr.RunID}, cdr) } else { - err = ms.db.C(utils.TBL_CDRS).Insert(cdr) + err = col.Insert(cdr) } return err } @@ -764,7 +830,7 @@ func (ms *MongoStorage) cleanEmptyFilters(filters bson.M) { } } -// _, err := ms.db.C(utils.TBL_CDRS).UpdateAll(bson.M{CGRIDLow: bson.M{"$in": cgrIds}}, bson.M{"$set": bson.M{"deleted_at": time.Now()}}) +// _, err := col(utils.TBL_CDRS).UpdateAll(bson.M{CGRIDLow: bson.M{"$in": cgrIds}}, bson.M{"$set": bson.M{"deleted_at": time.Now()}}) func (ms *MongoStorage) GetCDRs(qryFltr *utils.CDRsFilter, remove bool) ([]*CDR, int64, error) { var minPDD, maxPDD, minUsage, maxUsage *time.Duration if len(qryFltr.MinPDD) != 0 { @@ -886,7 +952,8 @@ func (ms *MongoStorage) GetCDRs(qryFltr *utils.CDRsFilter, remove bool) ([]*CDR, } //file.WriteString(fmt.Sprintf("AFTER: %v\n", utils.ToIJSON(filters))) //file.Close() - col := ms.db.C(utils.TBL_CDRS) + session, col := ms.conn(utils.TBL_CDRS) + defer session.Close() if remove { if chgd, err := col.RemoveAll(filters); err != nil { return nil, 0, err diff --git a/engine/storage_redis.go b/engine/storage_redis.go index fba52118d..201a4585a 100644 --- a/engine/storage_redis.go +++ b/engine/storage_redis.go @@ -151,7 +151,7 @@ func (rs *RedisStorage) cacheRating(dKeys, rpKeys, rpfKeys, lcrKeys, dcsKeys, ac utils.Logger.Info("Caching all destinations") if dKeys, err = conn.Cmd("KEYS", utils.DESTINATION_PREFIX+"*").List(); err != nil { cache2go.RollbackTransaction() - return err + return fmt.Errorf("destinations: %s", err.Error()) } cache2go.RemPrefixKey(utils.DESTINATION_PREFIX) } else if len(dKeys) != 0 { @@ -165,7 +165,7 @@ func (rs *RedisStorage) cacheRating(dKeys, rpKeys, rpfKeys, lcrKeys, dcsKeys, ac } if _, err = rs.GetDestination(key[len(utils.DESTINATION_PREFIX):]); err != nil { cache2go.RollbackTransaction() - return err + return fmt.Errorf("destinations: %s", err.Error()) } } if len(dKeys) != 0 { @@ -175,7 +175,7 @@ func (rs *RedisStorage) cacheRating(dKeys, rpKeys, rpfKeys, lcrKeys, dcsKeys, ac utils.Logger.Info("Caching all rating plans") if rpKeys, err = conn.Cmd("KEYS", utils.RATING_PLAN_PREFIX+"*").List(); err != nil { cache2go.RollbackTransaction() - return err + return fmt.Errorf("rating plans: %s", err.Error()) } cache2go.RemPrefixKey(utils.RATING_PLAN_PREFIX) } else if len(rpKeys) != 0 { @@ -185,7 +185,7 @@ func (rs *RedisStorage) cacheRating(dKeys, rpKeys, rpfKeys, lcrKeys, dcsKeys, ac cache2go.RemKey(key) if _, err = rs.GetRatingPlan(key[len(utils.RATING_PLAN_PREFIX):], true); err != nil { cache2go.RollbackTransaction() - return err + return fmt.Errorf("rating plans: %s", err.Error()) } } if len(rpKeys) != 0 { @@ -195,7 +195,7 @@ func (rs *RedisStorage) cacheRating(dKeys, rpKeys, rpfKeys, lcrKeys, dcsKeys, ac utils.Logger.Info("Caching all rating profiles") if rpfKeys, err = conn.Cmd("KEYS", utils.RATING_PROFILE_PREFIX+"*").List(); err != nil { cache2go.RollbackTransaction() - return err + return fmt.Errorf("rating profiles: %s", err.Error()) } cache2go.RemPrefixKey(utils.RATING_PROFILE_PREFIX) } else if len(rpfKeys) != 0 { @@ -205,7 +205,7 @@ func (rs *RedisStorage) cacheRating(dKeys, rpKeys, rpfKeys, lcrKeys, dcsKeys, ac cache2go.RemKey(key) if _, err = rs.GetRatingProfile(key[len(utils.RATING_PROFILE_PREFIX):], true); err != nil { cache2go.RollbackTransaction() - return err + return fmt.Errorf("rating profiles: %s", err.Error()) } } if len(rpfKeys) != 0 { @@ -215,7 +215,7 @@ func (rs *RedisStorage) cacheRating(dKeys, rpKeys, rpfKeys, lcrKeys, dcsKeys, ac utils.Logger.Info("Caching LCR rules.") if lcrKeys, err = conn.Cmd("KEYS", utils.LCR_PREFIX+"*").List(); err != nil { cache2go.RollbackTransaction() - return err + return fmt.Errorf("lcr rules: %s", err.Error()) } cache2go.RemPrefixKey(utils.LCR_PREFIX) } else if len(lcrKeys) != 0 { @@ -225,7 +225,7 @@ func (rs *RedisStorage) cacheRating(dKeys, rpKeys, rpfKeys, lcrKeys, dcsKeys, ac cache2go.RemKey(key) if _, err = rs.GetLCR(key[len(utils.LCR_PREFIX):], true); err != nil { cache2go.RollbackTransaction() - return err + return fmt.Errorf("lcr rules: %s", err.Error()) } } if len(lcrKeys) != 0 { @@ -236,7 +236,7 @@ func (rs *RedisStorage) cacheRating(dKeys, rpKeys, rpfKeys, lcrKeys, dcsKeys, ac utils.Logger.Info("Caching all derived chargers") if dcsKeys, err = conn.Cmd("KEYS", utils.DERIVEDCHARGERS_PREFIX+"*").List(); err != nil { cache2go.RollbackTransaction() - return err + return fmt.Errorf("derived chargers: %s", err.Error()) } cache2go.RemPrefixKey(utils.DERIVEDCHARGERS_PREFIX) } else if len(dcsKeys) != 0 { @@ -246,7 +246,7 @@ func (rs *RedisStorage) cacheRating(dKeys, rpKeys, rpfKeys, lcrKeys, dcsKeys, ac cache2go.RemKey(key) if _, err = rs.GetDerivedChargers(key[len(utils.DERIVEDCHARGERS_PREFIX):], true); err != nil { cache2go.RollbackTransaction() - return err + return fmt.Errorf("derived chargers: %s", err.Error()) } } if len(dcsKeys) != 0 { @@ -256,7 +256,7 @@ func (rs *RedisStorage) cacheRating(dKeys, rpKeys, rpfKeys, lcrKeys, dcsKeys, ac utils.Logger.Info("Caching all actions") if actKeys, err = conn.Cmd("KEYS", utils.ACTION_PREFIX+"*").List(); err != nil { cache2go.RollbackTransaction() - return err + return fmt.Errorf("actions: %s", err.Error()) } cache2go.RemPrefixKey(utils.ACTION_PREFIX) } else if len(actKeys) != 0 { @@ -266,7 +266,7 @@ func (rs *RedisStorage) cacheRating(dKeys, rpKeys, rpfKeys, lcrKeys, dcsKeys, ac cache2go.RemKey(key) if _, err = rs.GetActions(key[len(utils.ACTION_PREFIX):], true); err != nil { cache2go.RollbackTransaction() - return err + return fmt.Errorf("actions: %s", err.Error()) } } if len(actKeys) != 0 { @@ -277,7 +277,7 @@ func (rs *RedisStorage) cacheRating(dKeys, rpKeys, rpfKeys, lcrKeys, dcsKeys, ac utils.Logger.Info("Caching all action plans") if aplKeys, err = rs.db.Cmd("KEYS", utils.ACTION_PLAN_PREFIX+"*").List(); err != nil { cache2go.RollbackTransaction() - return err + return fmt.Errorf(" %s", err.Error()) } cache2go.RemPrefixKey(utils.ACTION_PLAN_PREFIX) } else if len(aplKeys) != 0 { @@ -287,7 +287,7 @@ func (rs *RedisStorage) cacheRating(dKeys, rpKeys, rpfKeys, lcrKeys, dcsKeys, ac cache2go.RemKey(key) if _, err = rs.GetActionPlan(key[len(utils.ACTION_PLAN_PREFIX):], true); err != nil { cache2go.RollbackTransaction() - return err + return fmt.Errorf(" %s", err.Error()) } } if len(aplKeys) != 0 { @@ -298,7 +298,7 @@ func (rs *RedisStorage) cacheRating(dKeys, rpKeys, rpfKeys, lcrKeys, dcsKeys, ac utils.Logger.Info("Caching all shared groups") if shgKeys, err = conn.Cmd("KEYS", utils.SHARED_GROUP_PREFIX+"*").List(); err != nil { cache2go.RollbackTransaction() - return err + return fmt.Errorf("shared groups: %s", err.Error()) } cache2go.RemPrefixKey(utils.SHARED_GROUP_PREFIX) } else if len(shgKeys) != 0 { @@ -308,7 +308,7 @@ func (rs *RedisStorage) cacheRating(dKeys, rpKeys, rpfKeys, lcrKeys, dcsKeys, ac cache2go.RemKey(key) if _, err = rs.GetSharedGroup(key[len(utils.SHARED_GROUP_PREFIX):], true); err != nil { cache2go.RollbackTransaction() - return err + return fmt.Errorf("shared groups: %s", err.Error()) } } if len(shgKeys) != 0 { @@ -360,7 +360,7 @@ func (rs *RedisStorage) cacheAccounting(alsKeys []string) (err error) { utils.Logger.Info("Caching all aliases") if alsKeys, err = conn.Cmd("KEYS", utils.ALIASES_PREFIX+"*").List(); err != nil { cache2go.RollbackTransaction() - return err + return fmt.Errorf("aliases: %s", err.Error()) } cache2go.RemPrefixKey(utils.ALIASES_PREFIX) cache2go.RemPrefixKey(utils.REVERSE_ALIASES_PREFIX) @@ -379,7 +379,7 @@ func (rs *RedisStorage) cacheAccounting(alsKeys []string) (err error) { cache2go.RemKey(key) if _, err = rs.GetAlias(key[len(utils.ALIASES_PREFIX):], true); err != nil { cache2go.RollbackTransaction() - return err + return fmt.Errorf("aliases: %s", err.Error()) } } if len(alsKeys) != 0 {