mongo reconnect

This commit is contained in:
Radu Ioan Fericean
2016-05-24 21:01:34 +03:00
parent 0b074668d2
commit 202bb184dc
3 changed files with 368 additions and 180 deletions

View File

@@ -25,6 +25,7 @@ import (
"fmt"
"io/ioutil"
"strings"
"time"
"github.com/cgrates/cgrates/cache2go"
"github.com/cgrates/cgrates/utils"
@@ -86,21 +87,35 @@ var (
type MongoStorage struct {
session *mgo.Session
db *mgo.Database
db string
ms Marshaler
}
func (ms *MongoStorage) conn(col string) (*mgo.Session, *mgo.Collection) {
sessionCopy := ms.session.Copy()
return sessionCopy, sessionCopy.DB(ms.db).C(col)
}
func NewMongoStorage(host, port, db, user, pass string, cdrsIndexes []string) (*MongoStorage, error) {
// We need this object to establish a session to our MongoDB.
address := fmt.Sprintf("%s:%s", host, port)
if user != "" && pass != "" {
address = fmt.Sprintf("%s:%s@%s", user, pass, address)
mongoDBDialInfo := &mgo.DialInfo{
Addrs: []string{address},
Timeout: 60 * time.Second,
Database: db,
Username: user,
Password: pass,
}
session, err := mgo.Dial(address)
// Create a session which maintains a pool of socket connections
// to our MongoDB.
session, err := mgo.DialWithInfo(mongoDBDialInfo)
if err != nil {
return nil, err
}
ndb := session.DB(db)
//session.SetMode(mgo.Monotonic, true)
session.SetMode(mgo.Monotonic, true)
index := mgo.Index{
Key: []string{"key"},
Unique: true, // Prevent two documents from having the same index key
@@ -260,7 +275,7 @@ func NewMongoStorage(host, port, db, user, pass string, cdrsIndexes []string) (*
if err = ndb.C(utils.TBLSMCosts).EnsureIndex(index); err != nil {
return nil, err
}
return &MongoStorage{db: ndb, session: session, ms: NewCodecMsgpackMarshaler()}, err
return &MongoStorage{db: db, session: session, ms: NewCodecMsgpackMarshaler()}, err
}
func (ms *MongoStorage) Close() {
@@ -278,47 +293,50 @@ func (ms *MongoStorage) GetKeysForPrefix(prefix string, skipCache bool) ([]strin
}
var result []string
if skipCache {
session := ms.session.Copy()
defer session.Close()
db := session.DB(ms.db)
keyResult := struct{ Key string }{}
idResult := struct{ Id string }{}
switch category {
case utils.DESTINATION_PREFIX:
iter := ms.db.C(colDst).Find(bson.M{"key": bson.M{"$regex": bson.RegEx{Pattern: subject}}}).Select(bson.M{"key": 1}).Iter()
iter := db.C(colDst).Find(bson.M{"key": bson.M{"$regex": bson.RegEx{Pattern: subject}}}).Select(bson.M{"key": 1}).Iter()
for iter.Next(&keyResult) {
result = append(result, utils.DESTINATION_PREFIX+keyResult.Key)
}
return result, nil
case utils.RATING_PLAN_PREFIX:
iter := ms.db.C(colRpl).Find(bson.M{"key": bson.M{"$regex": bson.RegEx{Pattern: subject}}}).Select(bson.M{"key": 1}).Iter()
iter := db.C(colRpl).Find(bson.M{"key": bson.M{"$regex": bson.RegEx{Pattern: subject}}}).Select(bson.M{"key": 1}).Iter()
for iter.Next(&keyResult) {
result = append(result, utils.RATING_PLAN_PREFIX+keyResult.Key)
}
return result, nil
case utils.RATING_PROFILE_PREFIX:
iter := ms.db.C(colRpf).Find(bson.M{"id": bson.M{"$regex": bson.RegEx{Pattern: subject}}}).Select(bson.M{"id": 1}).Iter()
iter := db.C(colRpf).Find(bson.M{"id": bson.M{"$regex": bson.RegEx{Pattern: subject}}}).Select(bson.M{"id": 1}).Iter()
for iter.Next(&idResult) {
result = append(result, utils.RATING_PROFILE_PREFIX+idResult.Id)
}
return result, nil
case utils.ACTION_PREFIX:
iter := ms.db.C(colAct).Find(bson.M{"key": bson.M{"$regex": bson.RegEx{Pattern: subject}}}).Select(bson.M{"key": 1}).Iter()
iter := db.C(colAct).Find(bson.M{"key": bson.M{"$regex": bson.RegEx{Pattern: subject}}}).Select(bson.M{"key": 1}).Iter()
for iter.Next(&keyResult) {
result = append(result, utils.ACTION_PREFIX+keyResult.Key)
}
return result, nil
case utils.ACTION_PLAN_PREFIX:
iter := ms.db.C(colApl).Find(bson.M{"key": bson.M{"$regex": bson.RegEx{Pattern: subject}}}).Select(bson.M{"key": 1}).Iter()
iter := db.C(colApl).Find(bson.M{"key": bson.M{"$regex": bson.RegEx{Pattern: subject}}}).Select(bson.M{"key": 1}).Iter()
for iter.Next(&keyResult) {
result = append(result, utils.ACTION_PLAN_PREFIX+keyResult.Key)
}
return result, nil
case utils.ACTION_TRIGGER_PREFIX:
iter := ms.db.C(colAtr).Find(bson.M{"key": bson.M{"$regex": bson.RegEx{Pattern: subject}}}).Select(bson.M{"key": 1}).Iter()
iter := db.C(colAtr).Find(bson.M{"key": bson.M{"$regex": bson.RegEx{Pattern: subject}}}).Select(bson.M{"key": 1}).Iter()
for iter.Next(&keyResult) {
result = append(result, utils.ACTION_TRIGGER_PREFIX+keyResult.Key)
}
return result, nil
case utils.ACCOUNT_PREFIX:
iter := ms.db.C(colAcc).Find(bson.M{"id": bson.M{"$regex": bson.RegEx{Pattern: subject}}}).Select(bson.M{"id": 1}).Iter()
iter := db.C(colAcc).Find(bson.M{"id": bson.M{"$regex": bson.RegEx{Pattern: subject}}}).Select(bson.M{"id": 1}).Iter()
for iter.Next(&idResult) {
result = append(result, utils.ACCOUNT_PREFIX+idResult.Id)
}
@@ -330,12 +348,15 @@ func (ms *MongoStorage) GetKeysForPrefix(prefix string, skipCache bool) ([]strin
}
func (ms *MongoStorage) Flush(ignore string) (err error) {
collections, err := ms.db.CollectionNames()
session := ms.session.Copy()
defer session.Close()
db := session.DB(ms.db)
collections, err := db.CollectionNames()
if err != nil {
return err
}
for _, c := range collections {
if err = ms.db.C(c).DropCollection(); err != nil {
if err = db.C(c).DropCollection(); err != nil {
return err
}
}
@@ -390,17 +411,20 @@ func (ms *MongoStorage) cacheRating(dKeys, rpKeys, rpfKeys, lcrKeys, dcsKeys, ac
cache2go.BeginTransaction()
keyResult := struct{ Key string }{}
idResult := struct{ Id string }{}
session := ms.session.Copy()
defer session.Close()
db := session.DB(ms.db)
if dKeys == nil || (float64(cache2go.CountEntries(utils.DESTINATION_PREFIX))*utils.DESTINATIONS_LOAD_THRESHOLD < float64(len(dKeys))) {
// if need to load more than a half of exiting keys load them all
utils.Logger.Info("Caching all destinations")
iter := ms.db.C(colDst).Find(nil).Select(bson.M{"key": 1}).Iter()
iter := db.C(colDst).Find(nil).Select(bson.M{"key": 1}).Iter()
dKeys = make([]string, 0)
for iter.Next(&keyResult) {
dKeys = append(dKeys, utils.DESTINATION_PREFIX+keyResult.Key)
}
if err := iter.Close(); err != nil {
cache2go.RollbackTransaction()
return err
return fmt.Errorf("destinations: %s", err.Error())
}
cache2go.RemPrefixKey(utils.DESTINATION_PREFIX)
} else if len(dKeys) != 0 {
@@ -414,7 +438,7 @@ func (ms *MongoStorage) cacheRating(dKeys, rpKeys, rpfKeys, lcrKeys, dcsKeys, ac
}
if _, err = ms.GetDestination(key[len(utils.DESTINATION_PREFIX):]); err != nil {
cache2go.RollbackTransaction()
return err
return fmt.Errorf("destinations: %s", err.Error())
}
}
if len(dKeys) != 0 {
@@ -422,14 +446,14 @@ func (ms *MongoStorage) cacheRating(dKeys, rpKeys, rpfKeys, lcrKeys, dcsKeys, ac
}
if rpKeys == nil {
utils.Logger.Info("Caching all rating plans")
iter := ms.db.C(colRpl).Find(nil).Select(bson.M{"key": 1}).Iter()
iter := db.C(colRpl).Find(nil).Select(bson.M{"key": 1}).Iter()
rpKeys = make([]string, 0)
for iter.Next(&keyResult) {
rpKeys = append(rpKeys, utils.RATING_PLAN_PREFIX+keyResult.Key)
}
if err := iter.Close(); err != nil {
cache2go.RollbackTransaction()
return err
return fmt.Errorf("rating plans: %s", err.Error())
}
cache2go.RemPrefixKey(utils.RATING_PLAN_PREFIX)
} else if len(rpKeys) != 0 {
@@ -439,7 +463,7 @@ func (ms *MongoStorage) cacheRating(dKeys, rpKeys, rpfKeys, lcrKeys, dcsKeys, ac
cache2go.RemKey(key)
if _, err = ms.GetRatingPlan(key[len(utils.RATING_PLAN_PREFIX):], true); err != nil {
cache2go.RollbackTransaction()
return err
return fmt.Errorf("rating plans: %s", err.Error())
}
}
if len(rpKeys) != 0 {
@@ -447,14 +471,14 @@ func (ms *MongoStorage) cacheRating(dKeys, rpKeys, rpfKeys, lcrKeys, dcsKeys, ac
}
if rpfKeys == nil {
utils.Logger.Info("Caching all rating profiles")
iter := ms.db.C(colRpf).Find(nil).Select(bson.M{"id": 1}).Iter()
iter := db.C(colRpf).Find(nil).Select(bson.M{"id": 1}).Iter()
rpfKeys = make([]string, 0)
for iter.Next(&idResult) {
rpfKeys = append(rpfKeys, utils.RATING_PROFILE_PREFIX+idResult.Id)
}
if err := iter.Close(); err != nil {
cache2go.RollbackTransaction()
return err
return fmt.Errorf("rating profiles: %s", err.Error())
}
cache2go.RemPrefixKey(utils.RATING_PROFILE_PREFIX)
} else if len(rpfKeys) != 0 {
@@ -464,7 +488,7 @@ func (ms *MongoStorage) cacheRating(dKeys, rpKeys, rpfKeys, lcrKeys, dcsKeys, ac
cache2go.RemKey(key)
if _, err = ms.GetRatingProfile(key[len(utils.RATING_PROFILE_PREFIX):], true); err != nil {
cache2go.RollbackTransaction()
return err
return fmt.Errorf("rating profiles: %s", err.Error())
}
}
if len(rpfKeys) != 0 {
@@ -472,14 +496,14 @@ func (ms *MongoStorage) cacheRating(dKeys, rpKeys, rpfKeys, lcrKeys, dcsKeys, ac
}
if lcrKeys == nil {
utils.Logger.Info("Caching LCR rules.")
iter := ms.db.C(colLcr).Find(nil).Select(bson.M{"key": 1}).Iter()
iter := db.C(colLcr).Find(nil).Select(bson.M{"key": 1}).Iter()
lcrKeys = make([]string, 0)
for iter.Next(&keyResult) {
lcrKeys = append(lcrKeys, utils.LCR_PREFIX+keyResult.Key)
}
if err := iter.Close(); err != nil {
cache2go.RollbackTransaction()
return err
return fmt.Errorf("lcr rules: %s", err.Error())
}
cache2go.RemPrefixKey(utils.LCR_PREFIX)
} else if len(lcrKeys) != 0 {
@@ -489,7 +513,7 @@ func (ms *MongoStorage) cacheRating(dKeys, rpKeys, rpfKeys, lcrKeys, dcsKeys, ac
cache2go.RemKey(key)
if _, err = ms.GetLCR(key[len(utils.LCR_PREFIX):], true); err != nil {
cache2go.RollbackTransaction()
return err
return fmt.Errorf("lcr rules: %s", err.Error())
}
}
if len(lcrKeys) != 0 {
@@ -498,14 +522,14 @@ func (ms *MongoStorage) cacheRating(dKeys, rpKeys, rpfKeys, lcrKeys, dcsKeys, ac
// DerivedChargers caching
if dcsKeys == nil {
utils.Logger.Info("Caching all derived chargers")
iter := ms.db.C(colDcs).Find(nil).Select(bson.M{"key": 1}).Iter()
iter := db.C(colDcs).Find(nil).Select(bson.M{"key": 1}).Iter()
dcsKeys = make([]string, 0)
for iter.Next(&keyResult) {
dcsKeys = append(dcsKeys, utils.DERIVEDCHARGERS_PREFIX+keyResult.Key)
}
if err := iter.Close(); err != nil {
cache2go.RollbackTransaction()
return err
return fmt.Errorf("derived chargers: %s", err.Error())
}
cache2go.RemPrefixKey(utils.DERIVEDCHARGERS_PREFIX)
} else if len(dcsKeys) != 0 {
@@ -515,7 +539,7 @@ func (ms *MongoStorage) cacheRating(dKeys, rpKeys, rpfKeys, lcrKeys, dcsKeys, ac
cache2go.RemKey(key)
if _, err = ms.GetDerivedChargers(key[len(utils.DERIVEDCHARGERS_PREFIX):], true); err != nil {
cache2go.RollbackTransaction()
return err
return fmt.Errorf("derived chargers: %s", err.Error())
}
}
if len(dcsKeys) != 0 {
@@ -526,14 +550,14 @@ func (ms *MongoStorage) cacheRating(dKeys, rpKeys, rpfKeys, lcrKeys, dcsKeys, ac
}
if actKeys == nil {
utils.Logger.Info("Caching all actions")
iter := ms.db.C(colAct).Find(nil).Select(bson.M{"key": 1}).Iter()
iter := db.C(colAct).Find(nil).Select(bson.M{"key": 1}).Iter()
actKeys = make([]string, 0)
for iter.Next(&keyResult) {
actKeys = append(actKeys, utils.ACTION_PREFIX+keyResult.Key)
}
if err := iter.Close(); err != nil {
cache2go.RollbackTransaction()
return err
return fmt.Errorf("actions: %s", err.Error())
}
cache2go.RemPrefixKey(utils.ACTION_PREFIX)
} else if len(actKeys) != 0 {
@@ -543,7 +567,7 @@ func (ms *MongoStorage) cacheRating(dKeys, rpKeys, rpfKeys, lcrKeys, dcsKeys, ac
cache2go.RemKey(key)
if _, err = ms.GetActions(key[len(utils.ACTION_PREFIX):], true); err != nil {
cache2go.RollbackTransaction()
return err
return fmt.Errorf("actions: %s", err.Error())
}
}
if len(actKeys) != 0 {
@@ -555,14 +579,14 @@ func (ms *MongoStorage) cacheRating(dKeys, rpKeys, rpfKeys, lcrKeys, dcsKeys, ac
}
if aplKeys == nil {
utils.Logger.Info("Caching all action plans")
iter := ms.db.C(colApl).Find(nil).Select(bson.M{"key": 1}).Iter()
iter := db.C(colApl).Find(nil).Select(bson.M{"key": 1}).Iter()
aplKeys = make([]string, 0)
for iter.Next(&keyResult) {
aplKeys = append(aplKeys, utils.ACTION_PLAN_PREFIX+keyResult.Key)
}
if err := iter.Close(); err != nil {
cache2go.RollbackTransaction()
return err
return fmt.Errorf("action plans: %s", err.Error())
}
cache2go.RemPrefixKey(utils.ACTION_PLAN_PREFIX)
} else if len(aplKeys) != 0 {
@@ -572,7 +596,7 @@ func (ms *MongoStorage) cacheRating(dKeys, rpKeys, rpfKeys, lcrKeys, dcsKeys, ac
cache2go.RemKey(key)
if _, err = ms.GetActionPlan(key[len(utils.ACTION_PLAN_PREFIX):], true); err != nil {
cache2go.RollbackTransaction()
return err
return fmt.Errorf("action plans: %s", err.Error())
}
}
if len(aplKeys) != 0 {
@@ -584,14 +608,14 @@ func (ms *MongoStorage) cacheRating(dKeys, rpKeys, rpfKeys, lcrKeys, dcsKeys, ac
}
if shgKeys == nil {
utils.Logger.Info("Caching all shared groups")
iter := ms.db.C(colShg).Find(nil).Select(bson.M{"id": 1}).Iter()
iter := db.C(colShg).Find(nil).Select(bson.M{"id": 1}).Iter()
shgKeys = make([]string, 0)
for iter.Next(&idResult) {
shgKeys = append(shgKeys, utils.SHARED_GROUP_PREFIX+idResult.Id)
}
if err := iter.Close(); err != nil {
cache2go.RollbackTransaction()
return err
return fmt.Errorf("shared groups: %s", err.Error())
}
} else if len(shgKeys) != 0 {
utils.Logger.Info(fmt.Sprintf("Caching shared groups: %v", shgKeys))
@@ -600,7 +624,7 @@ func (ms *MongoStorage) cacheRating(dKeys, rpKeys, rpfKeys, lcrKeys, dcsKeys, ac
cache2go.RemKey(key)
if _, err = ms.GetSharedGroup(key[len(utils.SHARED_GROUP_PREFIX):], true); err != nil {
cache2go.RollbackTransaction()
return err
return fmt.Errorf("shared groups: %s", err.Error())
}
}
if len(shgKeys) != 0 {
@@ -646,16 +670,19 @@ func (ms *MongoStorage) cacheAccounting(alsKeys []string) (err error) {
if alsKeys == nil {
cache2go.RemPrefixKey(utils.ALIASES_PREFIX)
}
session := ms.session.Copy()
defer session.Close()
db := session.DB(ms.db)
if alsKeys == nil {
utils.Logger.Info("Caching all aliases")
iter := ms.db.C(colAls).Find(nil).Select(bson.M{"key": 1}).Iter()
iter := db.C(colAls).Find(nil).Select(bson.M{"key": 1}).Iter()
alsKeys = make([]string, 0)
for iter.Next(&keyResult) {
alsKeys = append(alsKeys, utils.ALIASES_PREFIX+keyResult.Key)
}
if err := iter.Close(); err != nil {
cache2go.RollbackTransaction()
return err
return fmt.Errorf("aliases: %s", err.Error())
}
} else if len(alsKeys) != 0 {
utils.Logger.Info(fmt.Sprintf("Caching aliases: %v", alsKeys))
@@ -671,7 +698,7 @@ func (ms *MongoStorage) cacheAccounting(alsKeys []string) (err error) {
cache2go.RemKey(key)
if _, err = ms.GetAlias(key[len(utils.ALIASES_PREFIX):], true); err != nil {
cache2go.RollbackTransaction()
return err
return fmt.Errorf("aliases: %s", err.Error())
}
}
if len(alsKeys) != 0 {
@@ -688,24 +715,27 @@ func (ms *MongoStorage) cacheAccounting(alsKeys []string) (err error) {
}
func (ms *MongoStorage) HasData(category, subject string) (bool, error) {
session := ms.session.Copy()
defer session.Close()
db := session.DB(ms.db)
switch category {
case utils.DESTINATION_PREFIX:
count, err := ms.db.C(colDst).Find(bson.M{"key": subject}).Count()
count, err := db.C(colDst).Find(bson.M{"key": subject}).Count()
return count > 0, err
case utils.RATING_PLAN_PREFIX:
count, err := ms.db.C(colRpl).Find(bson.M{"key": subject}).Count()
count, err := db.C(colRpl).Find(bson.M{"key": subject}).Count()
return count > 0, err
case utils.RATING_PROFILE_PREFIX:
count, err := ms.db.C(colRpf).Find(bson.M{"id": subject}).Count()
count, err := db.C(colRpf).Find(bson.M{"id": subject}).Count()
return count > 0, err
case utils.ACTION_PREFIX:
count, err := ms.db.C(colAct).Find(bson.M{"key": subject}).Count()
count, err := db.C(colAct).Find(bson.M{"key": subject}).Count()
return count > 0, err
case utils.ACTION_PLAN_PREFIX:
count, err := ms.db.C(colApl).Find(bson.M{"key": subject}).Count()
count, err := db.C(colApl).Find(bson.M{"key": subject}).Count()
return count > 0, err
case utils.ACCOUNT_PREFIX:
count, err := ms.db.C(colAcc).Find(bson.M{"id": subject}).Count()
count, err := db.C(colAcc).Find(bson.M{"id": subject}).Count()
return count > 0, err
}
return false, errors.New("unsupported category in HasData")
@@ -724,7 +754,9 @@ func (ms *MongoStorage) GetRatingPlan(key string, skipCache bool) (rp *RatingPla
Key string
Value []byte
}
err = ms.db.C(colRpl).Find(bson.M{"key": key}).One(&kv)
session, col := ms.conn(colRpl)
defer session.Close()
err = col.Find(bson.M{"key": key}).One(&kv)
if err == nil {
b := bytes.NewBuffer(kv.Value)
r, err := zlib.NewReader(b)
@@ -754,7 +786,9 @@ func (ms *MongoStorage) SetRatingPlan(rp *RatingPlan) error {
w := zlib.NewWriter(&b)
w.Write(result)
w.Close()
_, err = ms.db.C(colRpl).Upsert(bson.M{"key": rp.Id}, &struct {
session, col := ms.conn(colRpl)
defer session.Close()
_, err = col.Upsert(bson.M{"key": rp.Id}, &struct {
Key string
Value []byte
}{Key: rp.Id, Value: b.Bytes()})
@@ -774,7 +808,9 @@ func (ms *MongoStorage) GetRatingProfile(key string, skipCache bool) (rp *Rating
}
}
rp = new(RatingProfile)
err = ms.db.C(colRpf).Find(bson.M{"id": key}).One(rp)
session, col := ms.conn(colRpf)
defer session.Close()
err = col.Find(bson.M{"id": key}).One(rp)
if err == nil {
cache2go.Cache(utils.RATING_PROFILE_PREFIX+key, rp)
}
@@ -782,7 +818,9 @@ func (ms *MongoStorage) GetRatingProfile(key string, skipCache bool) (rp *Rating
}
func (ms *MongoStorage) SetRatingProfile(rp *RatingProfile) error {
_, err := ms.db.C(colRpf).Upsert(bson.M{"id": rp.Id}, rp)
session, col := ms.conn(colRpf)
defer session.Close()
_, err := col.Upsert(bson.M{"id": rp.Id}, rp)
if err == nil && historyScribe != nil {
var response int
historyScribe.Call("HistoryV1.Record", rp.GetHistoryRecord(false), &response)
@@ -791,10 +829,12 @@ func (ms *MongoStorage) SetRatingProfile(rp *RatingProfile) error {
}
func (ms *MongoStorage) RemoveRatingProfile(key string) error {
iter := ms.db.C(colRpf).Find(bson.M{"id": bson.RegEx{Pattern: key + ".*", Options: ""}}).Select(bson.M{"id": 1}).Iter()
session, col := ms.conn(colRpf)
defer session.Close()
iter := col.Find(bson.M{"id": bson.RegEx{Pattern: key + ".*", Options: ""}}).Select(bson.M{"id": 1}).Iter()
var result struct{ Id string }
for iter.Next(&result) {
if err := ms.db.C(colRpf).Remove(bson.M{"id": result.Id}); err != nil {
if err := col.Remove(bson.M{"id": result.Id}); err != nil {
return err
}
cache2go.RemKey(utils.RATING_PROFILE_PREFIX + key)
@@ -819,7 +859,9 @@ func (ms *MongoStorage) GetLCR(key string, skipCache bool) (lcr *LCR, err error)
Key string
Value *LCR
}
err = ms.db.C(colLcr).Find(bson.M{"key": key}).One(&result)
session, col := ms.conn(colLcr)
defer session.Close()
err = col.Find(bson.M{"key": key}).One(&result)
if err == nil {
lcr = result.Value
cache2go.Cache(utils.LCR_PREFIX+key, lcr)
@@ -828,7 +870,9 @@ func (ms *MongoStorage) GetLCR(key string, skipCache bool) (lcr *LCR, err error)
}
func (ms *MongoStorage) SetLCR(lcr *LCR) error {
_, err := ms.db.C(colLcr).Upsert(bson.M{"key": lcr.GetId()}, &struct {
session, col := ms.conn(colLcr)
defer session.Close()
_, err := col.Upsert(bson.M{"key": lcr.GetId()}, &struct {
Key string
Value *LCR
}{lcr.GetId(), lcr})
@@ -841,7 +885,9 @@ func (ms *MongoStorage) GetDestination(key string) (result *Destination, err err
Key string
Value []byte
}
err = ms.db.C(colDst).Find(bson.M{"key": key}).One(&kv)
session, col := ms.conn(colDst)
defer session.Close()
err = col.Find(bson.M{"key": key}).One(&kv)
if err == nil {
b := bytes.NewBuffer(kv.Value)
r, err := zlib.NewReader(b)
@@ -877,7 +923,9 @@ func (ms *MongoStorage) SetDestination(dest *Destination) (err error) {
w := zlib.NewWriter(&b)
w.Write(result)
w.Close()
_, err = ms.db.C(colDst).Upsert(bson.M{"key": dest.Id}, &struct {
session, col := ms.conn(colDst)
defer session.Close()
_, err = col.Upsert(bson.M{"key": dest.Id}, &struct {
Key string
Value []byte
}{Key: dest.Id, Value: b.Bytes()})
@@ -904,7 +952,9 @@ func (ms *MongoStorage) GetActions(key string, skipCache bool) (as Actions, err
Key string
Value Actions
}
err = ms.db.C(colAct).Find(bson.M{"key": key}).One(&result)
session, col := ms.conn(colAct)
defer session.Close()
err = col.Find(bson.M{"key": key}).One(&result)
if err == nil {
as = result.Value
cache2go.Cache(utils.ACTION_PREFIX+key, as)
@@ -913,7 +963,9 @@ func (ms *MongoStorage) GetActions(key string, skipCache bool) (as Actions, err
}
func (ms *MongoStorage) SetActions(key string, as Actions) error {
_, err := ms.db.C(colAct).Upsert(bson.M{"key": key}, &struct {
session, col := ms.conn(colAct)
defer session.Close()
_, err := col.Upsert(bson.M{"key": key}, &struct {
Key string
Value Actions
}{Key: key, Value: as})
@@ -921,7 +973,9 @@ func (ms *MongoStorage) SetActions(key string, as Actions) error {
}
func (ms *MongoStorage) RemoveActions(key string) error {
return ms.db.C(colAct).Remove(bson.M{"key": key})
session, col := ms.conn(colAct)
defer session.Close()
return col.Remove(bson.M{"key": key})
}
func (ms *MongoStorage) GetSharedGroup(key string, skipCache bool) (sg *SharedGroup, err error) {
@@ -932,8 +986,10 @@ func (ms *MongoStorage) GetSharedGroup(key string, skipCache bool) (sg *SharedGr
return nil, err
}
}
session, col := ms.conn(colShg)
defer session.Close()
sg = &SharedGroup{}
err = ms.db.C(colShg).Find(bson.M{"id": key}).One(sg)
err = col.Find(bson.M{"id": key}).One(sg)
if err == nil {
cache2go.Cache(utils.SHARED_GROUP_PREFIX+key, sg)
}
@@ -941,13 +997,17 @@ func (ms *MongoStorage) GetSharedGroup(key string, skipCache bool) (sg *SharedGr
}
func (ms *MongoStorage) SetSharedGroup(sg *SharedGroup) (err error) {
_, err = ms.db.C(colShg).Upsert(bson.M{"id": sg.Id}, sg)
session, col := ms.conn(colShg)
defer session.Close()
_, err = col.Upsert(bson.M{"id": sg.Id}, sg)
return err
}
func (ms *MongoStorage) GetAccount(key string) (result *Account, err error) {
result = new(Account)
err = ms.db.C(colAcc).Find(bson.M{"id": key}).One(result)
session, col := ms.conn(colAcc)
defer session.Close()
err = col.Find(bson.M{"id": key}).One(result)
if err == mgo.ErrNotFound {
result = nil
}
@@ -967,12 +1027,16 @@ func (ms *MongoStorage) SetAccount(acc *Account) error {
acc = ac
}
}
_, err := ms.db.C(colAcc).Upsert(bson.M{"id": acc.ID}, acc)
session, col := ms.conn(colAcc)
defer session.Close()
_, err := col.Upsert(bson.M{"id": acc.ID}, acc)
return err
}
func (ms *MongoStorage) RemoveAccount(key string) error {
return ms.db.C(colAcc).Remove(bson.M{"id": key})
session, col := ms.conn(colAcc)
defer session.Close()
return col.Remove(bson.M{"id": key})
}
@@ -981,7 +1045,9 @@ func (ms *MongoStorage) GetCdrStatsQueue(key string) (sq *StatsQueue, err error)
Key string
Value *StatsQueue
}
err = ms.db.C(colStq).Find(bson.M{"key": key}).One(&result)
session, col := ms.conn(colStq)
defer session.Close()
err = col.Find(bson.M{"key": key}).One(&result)
if err == nil {
sq = result.Value
}
@@ -989,7 +1055,9 @@ func (ms *MongoStorage) GetCdrStatsQueue(key string) (sq *StatsQueue, err error)
}
func (ms *MongoStorage) SetCdrStatsQueue(sq *StatsQueue) (err error) {
_, err = ms.db.C(colStq).Upsert(bson.M{"key": sq.GetId()}, &struct {
session, col := ms.conn(colStq)
defer session.Close()
_, err = col.Upsert(bson.M{"key": sq.GetId()}, &struct {
Key string
Value *StatsQueue
}{Key: sq.GetId(), Value: sq})
@@ -997,7 +1065,9 @@ func (ms *MongoStorage) SetCdrStatsQueue(sq *StatsQueue) (err error) {
}
func (ms *MongoStorage) GetSubscribers() (result map[string]*SubscriberData, err error) {
iter := ms.db.C(colPbs).Find(nil).Iter()
session, col := ms.conn(colPbs)
defer session.Close()
iter := col.Find(nil).Iter()
result = make(map[string]*SubscriberData)
var kv struct {
Key string
@@ -1011,7 +1081,9 @@ func (ms *MongoStorage) GetSubscribers() (result map[string]*SubscriberData, err
}
func (ms *MongoStorage) SetSubscriber(key string, sub *SubscriberData) (err error) {
_, err = ms.db.C(colPbs).Upsert(bson.M{"key": key}, &struct {
session, col := ms.conn(colPbs)
defer session.Close()
_, err = col.Upsert(bson.M{"key": key}, &struct {
Key string
Value *SubscriberData
}{Key: key, Value: sub})
@@ -1019,11 +1091,15 @@ func (ms *MongoStorage) SetSubscriber(key string, sub *SubscriberData) (err erro
}
func (ms *MongoStorage) RemoveSubscriber(key string) (err error) {
return ms.db.C(colPbs).Remove(bson.M{"key": key})
session, col := ms.conn(colPbs)
defer session.Close()
return col.Remove(bson.M{"key": key})
}
func (ms *MongoStorage) SetUser(up *UserProfile) (err error) {
_, err = ms.db.C(colUsr).Upsert(bson.M{"key": up.GetId()}, &struct {
session, col := ms.conn(colUsr)
defer session.Close()
_, err = col.Upsert(bson.M{"key": up.GetId()}, &struct {
Key string
Value *UserProfile
}{Key: up.GetId(), Value: up})
@@ -1035,7 +1111,9 @@ func (ms *MongoStorage) GetUser(key string) (up *UserProfile, err error) {
Key string
Value *UserProfile
}
err = ms.db.C(colUsr).Find(bson.M{"key": key}).One(&kv)
session, col := ms.conn(colUsr)
defer session.Close()
err = col.Find(bson.M{"key": key}).One(&kv)
if err == nil {
up = kv.Value
}
@@ -1043,7 +1121,9 @@ func (ms *MongoStorage) GetUser(key string) (up *UserProfile, err error) {
}
func (ms *MongoStorage) GetUsers() (result []*UserProfile, err error) {
iter := ms.db.C(colUsr).Find(nil).Iter()
session, col := ms.conn(colUsr)
defer session.Close()
iter := col.Find(nil).Iter()
var kv struct {
Key string
Value *UserProfile
@@ -1056,11 +1136,15 @@ func (ms *MongoStorage) GetUsers() (result []*UserProfile, err error) {
}
func (ms *MongoStorage) RemoveUser(key string) (err error) {
return ms.db.C(colUsr).Remove(bson.M{"key": key})
session, col := ms.conn(colUsr)
defer session.Close()
return col.Remove(bson.M{"key": key})
}
func (ms *MongoStorage) SetAlias(al *Alias) (err error) {
_, err = ms.db.C(colAls).Upsert(bson.M{"key": al.GetId()}, &struct {
session, col := ms.conn(colAls)
defer session.Close()
_, err = col.Upsert(bson.M{"key": al.GetId()}, &struct {
Key string
Value AliasValues
}{Key: al.GetId(), Value: al.Values})
@@ -1083,7 +1167,9 @@ func (ms *MongoStorage) GetAlias(key string, skipCache bool) (al *Alias, err err
Key string
Value AliasValues
}
if err = ms.db.C(colAls).Find(bson.M{"key": origKey}).One(&kv); err == nil {
session, col := ms.conn(colAls)
defer session.Close()
if err = col.Find(bson.M{"key": origKey}).One(&kv); err == nil {
al = &Alias{Values: kv.Value}
al.SetId(origKey)
if err == nil {
@@ -1104,10 +1190,12 @@ func (ms *MongoStorage) RemoveAlias(key string) (err error) {
Key string
Value AliasValues
}
if err := ms.db.C(colAls).Find(bson.M{"key": origKey}).One(&kv); err == nil {
session, col := ms.conn(colAls)
defer session.Close()
if err := col.Find(bson.M{"key": origKey}).One(&kv); err == nil {
al.Values = kv.Value
}
err = ms.db.C(colAls).Remove(bson.M{"key": origKey})
err = col.Remove(bson.M{"key": origKey})
if err == nil {
al.RemoveReverseCache()
cache2go.RemKey(key)
@@ -1135,7 +1223,9 @@ func (ms *MongoStorage) GetLoadHistory(limit int, skipCache bool) (loadInsts []*
Key string
Value []*LoadInstance
}
err = ms.db.C(colLht).Find(bson.M{"key": utils.LOADINST_KEY}).One(&kv)
session, col := ms.conn(colLht)
defer session.Close()
err = col.Find(bson.M{"key": utils.LOADINST_KEY}).One(&kv)
if err == nil {
loadInsts = kv.Value
cache2go.RemKey(utils.LOADINST_KEY)
@@ -1155,7 +1245,9 @@ func (ms *MongoStorage) AddLoadHistory(ldInst *LoadInstance, loadHistSize int) e
Key string
Value []*LoadInstance
}
err := ms.db.C(colLht).Find(bson.M{"key": utils.LOADINST_KEY}).One(&kv)
session, col := ms.conn(colLht)
defer session.Close()
err := col.Find(bson.M{"key": utils.LOADINST_KEY}).One(&kv)
if err != nil && err != mgo.ErrNotFound {
return err
@@ -1176,7 +1268,9 @@ func (ms *MongoStorage) AddLoadHistory(ldInst *LoadInstance, loadHistSize int) e
if histLen >= loadHistSize { // Have hit maximum history allowed, remove oldest element in order to add new one
existingLoadHistory = existingLoadHistory[:loadHistSize]
}
_, err = ms.db.C(colLht).Upsert(bson.M{"key": utils.LOADINST_KEY}, &struct {
session, col := ms.conn(colLht)
defer session.Close()
_, err = col.Upsert(bson.M{"key": utils.LOADINST_KEY}, &struct {
Key string
Value []*LoadInstance
}{Key: utils.LOADINST_KEY, Value: existingLoadHistory})
@@ -1190,7 +1284,9 @@ func (ms *MongoStorage) GetActionTriggers(key string) (atrs ActionTriggers, err
Key string
Value ActionTriggers
}
err = ms.db.C(colAtr).Find(bson.M{"key": key}).One(&kv)
session, col := ms.conn(colAtr)
defer session.Close()
err = col.Find(bson.M{"key": key}).One(&kv)
if err == nil {
atrs = kv.Value
}
@@ -1198,14 +1294,16 @@ func (ms *MongoStorage) GetActionTriggers(key string) (atrs ActionTriggers, err
}
func (ms *MongoStorage) SetActionTriggers(key string, atrs ActionTriggers) (err error) {
session, col := ms.conn(colAtr)
defer session.Close()
if len(atrs) == 0 {
err = ms.db.C(colAtr).Remove(bson.M{"key": key}) // delete the key
err = col.Remove(bson.M{"key": key}) // delete the key
if err != mgo.ErrNotFound {
return err
}
return nil
}
_, err = ms.db.C(colAtr).Upsert(bson.M{"key": key}, &struct {
_, err = col.Upsert(bson.M{"key": key}, &struct {
Key string
Value ActionTriggers
}{Key: key, Value: atrs})
@@ -1224,7 +1322,9 @@ func (ms *MongoStorage) GetActionPlan(key string, skipCache bool) (ats *ActionPl
Key string
Value []byte
}
err = ms.db.C(colApl).Find(bson.M{"key": key}).One(&kv)
session, col := ms.conn(colApl)
defer session.Close()
err = col.Find(bson.M{"key": key}).One(&kv)
if err == nil {
b := bytes.NewBuffer(kv.Value)
r, err := zlib.NewReader(b)
@@ -1246,10 +1346,12 @@ func (ms *MongoStorage) GetActionPlan(key string, skipCache bool) (ats *ActionPl
}
func (ms *MongoStorage) SetActionPlan(key string, ats *ActionPlan, overwrite bool) error {
session, col := ms.conn(colApl)
defer session.Close()
// clean dots from account ids map
if len(ats.ActionTimings) == 0 {
cache2go.RemKey(utils.ACTION_PLAN_PREFIX + key)
err := ms.db.C(colApl).Remove(bson.M{"key": key})
err := col.Remove(bson.M{"key": key})
if err != mgo.ErrNotFound {
return err
}
@@ -1274,7 +1376,7 @@ func (ms *MongoStorage) SetActionPlan(key string, ats *ActionPlan, overwrite boo
w := zlib.NewWriter(&b)
w.Write(result)
w.Close()
_, err = ms.db.C(colApl).Upsert(bson.M{"key": key}, &struct {
_, err = col.Upsert(bson.M{"key": key}, &struct {
Key string
Value []byte
}{Key: key, Value: b.Bytes()})
@@ -1297,7 +1399,9 @@ func (ms *MongoStorage) GetAllActionPlans() (ats map[string]*ActionPlan, err err
}
func (ms *MongoStorage) PushTask(t *Task) error {
return ms.db.C(colTsk).Insert(bson.M{"_id": bson.NewObjectId(), "task": t})
session, col := ms.conn(colTsk)
defer session.Close()
return col.Insert(bson.M{"_id": bson.NewObjectId(), "task": t})
}
func (ms *MongoStorage) PopTask() (t *Task, err error) {
@@ -1305,8 +1409,10 @@ func (ms *MongoStorage) PopTask() (t *Task, err error) {
ID bson.ObjectId `bson:"_id"`
Task *Task
}{}
if err = ms.db.C(colTsk).Find(nil).One(&v); err == nil {
if remErr := ms.db.C(colTsk).Remove(bson.M{"_id": v.ID}); remErr != nil {
session, col := ms.conn(colTsk)
defer session.Close()
if err = col.Find(nil).One(&v); err == nil {
if remErr := col.Remove(bson.M{"_id": v.ID}); remErr != nil {
return nil, remErr
}
t = v.Task
@@ -1327,7 +1433,9 @@ func (ms *MongoStorage) GetDerivedChargers(key string, skipCache bool) (dcs *uti
Key string
Value *utils.DerivedChargers
}
err = ms.db.C(colDcs).Find(bson.M{"key": key}).One(&kv)
session, col := ms.conn(colDcs)
defer session.Close()
err = col.Find(bson.M{"key": key}).One(&kv)
if err == nil {
dcs = kv.Value
cache2go.Cache(utils.DERIVEDCHARGERS_PREFIX+key, dcs)
@@ -1338,13 +1446,17 @@ func (ms *MongoStorage) GetDerivedChargers(key string, skipCache bool) (dcs *uti
func (ms *MongoStorage) SetDerivedChargers(key string, dcs *utils.DerivedChargers) (err error) {
if dcs == nil || len(dcs.Chargers) == 0 {
cache2go.RemKey(utils.DERIVEDCHARGERS_PREFIX + key)
err = ms.db.C(colDcs).Remove(bson.M{"key": key})
session, col := ms.conn(colDcs)
defer session.Close()
err = col.Remove(bson.M{"key": key})
if err != mgo.ErrNotFound {
return err
}
return nil
}
_, err = ms.db.C(colDcs).Upsert(bson.M{"key": key}, &struct {
session, col := ms.conn(colDcs)
defer session.Close()
_, err = col.Upsert(bson.M{"key": key}, &struct {
Key string
Value *utils.DerivedChargers
}{Key: key, Value: dcs})
@@ -1352,18 +1464,24 @@ func (ms *MongoStorage) SetDerivedChargers(key string, dcs *utils.DerivedCharger
}
func (ms *MongoStorage) SetCdrStats(cs *CdrStats) error {
_, err := ms.db.C(colCrs).Upsert(bson.M{"id": cs.Id}, cs)
session, col := ms.conn(colCrs)
defer session.Close()
_, err := col.Upsert(bson.M{"id": cs.Id}, cs)
return err
}
func (ms *MongoStorage) GetCdrStats(key string) (cs *CdrStats, err error) {
cs = &CdrStats{}
err = ms.db.C(colCrs).Find(bson.M{"id": key}).One(cs)
session, col := ms.conn(colCrs)
defer session.Close()
err = col.Find(bson.M{"id": key}).One(cs)
return
}
func (ms *MongoStorage) GetAllCdrStats() (css []*CdrStats, err error) {
iter := ms.db.C(colCrs).Find(nil).Iter()
session, col := ms.conn(colCrs)
defer session.Close()
iter := col.Find(nil).Iter()
var cs CdrStats
for iter.Next(&cs) {
clone := cs // avoid using the same pointer in append
@@ -1374,7 +1492,9 @@ func (ms *MongoStorage) GetAllCdrStats() (css []*CdrStats, err error) {
}
func (ms *MongoStorage) SetStructVersion(v *StructVersion) (err error) {
_, err = ms.db.C(colVer).Upsert(bson.M{"key": utils.VERSION_PREFIX + "struct"}, &struct {
session, col := ms.conn(colVer)
defer session.Close()
_, err = col.Upsert(bson.M{"key": utils.VERSION_PREFIX + "struct"}, &struct {
Key string
Value *StructVersion
}{utils.VERSION_PREFIX + "struct", v})
@@ -1386,8 +1506,9 @@ func (ms *MongoStorage) GetStructVersion() (rsv *StructVersion, err error) {
Key string
Value StructVersion
}
err = ms.db.C(colVer).Find(bson.M{"key": utils.VERSION_PREFIX + "struct"}).One(&result)
session, col := ms.conn(colVer)
defer session.Close()
err = col.Find(bson.M{"key": utils.VERSION_PREFIX + "struct"}).One(&result)
if err == mgo.ErrNotFound {
rsv = nil
}

View File

@@ -12,14 +12,17 @@ import (
func (ms *MongoStorage) GetTpIds() ([]string, error) {
tpidMap := make(map[string]bool)
cols, err := ms.db.CollectionNames()
session := ms.session.Copy()
db := session.DB(ms.db)
defer session.Close()
cols, err := db.CollectionNames()
if err != nil {
return nil, err
}
for _, col := range cols {
if strings.HasPrefix(col, "tp_") {
tpids := make([]string, 0)
if err := ms.db.C(col).Find(nil).Select(bson.M{"tpid": 1}).Distinct("tpid", &tpids); err != nil {
if err := db.C(col).Find(nil).Select(bson.M{"tpid": 1}).Distinct("tpid", &tpids); err != nil {
return nil, err
}
for _, tpid := range tpids {
@@ -55,7 +58,9 @@ func (ms *MongoStorage) GetTpTableIds(tpid, table string, distinct utils.TPDisti
}
findMap["$and"] = []bson.M{bson.M{"$or": searchItems}}
}
q := ms.db.C(table).Find(findMap)
session, col := ms.conn(table)
defer session.Close()
q := col.Find(findMap)
if pag != nil {
if pag.Limit != nil {
q = q.Limit(*pag.Limit)
@@ -99,7 +104,9 @@ func (ms *MongoStorage) GetTpTimings(tpid, tag string) ([]TpTiming, error) {
filter["tag"] = tag
}
var results []TpTiming
err := ms.db.C(utils.TBL_TP_TIMINGS).Find(filter).All(&results)
session, col := ms.conn(utils.TBL_TP_TIMINGS)
defer session.Close()
err := col.Find(filter).All(&results)
return results, err
}
@@ -111,7 +118,9 @@ func (ms *MongoStorage) GetTpDestinations(tpid, tag string) ([]TpDestination, er
filter["tag"] = tag
}
var results []TpDestination
err := ms.db.C(utils.TBL_TP_DESTINATIONS).Find(filter).All(&results)
session, col := ms.conn(utils.TBL_TP_DESTINATIONS)
defer session.Close()
err := col.Find(filter).All(&results)
return results, err
}
@@ -123,7 +132,9 @@ func (ms *MongoStorage) GetTpRates(tpid, tag string) ([]TpRate, error) {
filter["tag"] = tag
}
var results []TpRate
err := ms.db.C(utils.TBL_TP_RATES).Find(filter).All(&results)
session, col := ms.conn(utils.TBL_TP_RATES)
defer session.Close()
err := col.Find(filter).All(&results)
return results, err
}
@@ -135,7 +146,9 @@ func (ms *MongoStorage) GetTpDestinationRates(tpid, tag string, pag *utils.Pagin
filter["tag"] = tag
}
var results []TpDestinationRate
q := ms.db.C(utils.TBL_TP_DESTINATION_RATES).Find(filter)
session, col := ms.conn(utils.TBL_TP_DESTINATION_RATES)
defer session.Close()
q := col.Find(filter)
if pag != nil {
if pag.Limit != nil {
q = q.Limit(*pag.Limit)
@@ -156,7 +169,9 @@ func (ms *MongoStorage) GetTpRatingPlans(tpid, tag string, pag *utils.Paginator)
filter["tag"] = tag
}
var results []TpRatingPlan
q := ms.db.C(utils.TBL_TP_RATING_PLANS).Find(filter)
session, col := ms.conn(utils.TBL_TP_RATING_PLANS)
defer session.Close()
q := col.Find(filter)
if pag != nil {
if pag.Limit != nil {
q = q.Limit(*pag.Limit)
@@ -187,7 +202,9 @@ func (ms *MongoStorage) GetTpRatingProfiles(tp *TpRatingProfile) ([]TpRatingProf
filter["loadid"] = tp.Loadid
}
var results []TpRatingProfile
err := ms.db.C(utils.TBL_TP_RATE_PROFILES).Find(filter).All(&results)
session, col := ms.conn(utils.TBL_TP_RATE_PROFILES)
defer session.Close()
err := col.Find(filter).All(&results)
return results, err
}
@@ -199,7 +216,9 @@ func (ms *MongoStorage) GetTpSharedGroups(tpid, tag string) ([]TpSharedGroup, er
filter["tag"] = tag
}
var results []TpSharedGroup
err := ms.db.C(utils.TBL_TP_SHARED_GROUPS).Find(filter).All(&results)
session, col := ms.conn(utils.TBL_TP_SHARED_GROUPS)
defer session.Close()
err := col.Find(filter).All(&results)
return results, err
}
@@ -211,7 +230,9 @@ func (ms *MongoStorage) GetTpCdrStats(tpid, tag string) ([]TpCdrstat, error) {
filter["tag"] = tag
}
var results []TpCdrstat
err := ms.db.C(utils.TBL_TP_CDR_STATS).Find(filter).All(&results)
session, col := ms.conn(utils.TBL_TP_CDR_STATS)
defer session.Close()
err := col.Find(filter).All(&results)
return results, err
}
func (ms *MongoStorage) GetTpLCRs(tp *TpLcrRule) ([]TpLcrRule, error) {
@@ -232,7 +253,9 @@ func (ms *MongoStorage) GetTpLCRs(tp *TpLcrRule) ([]TpLcrRule, error) {
filter["subject"] = tp.Subject
}
var results []TpLcrRule
err := ms.db.C(utils.TBL_TP_LCRS).Find(filter).All(&results)
session, col := ms.conn(utils.TBL_TP_LCRS)
defer session.Close()
err := col.Find(filter).All(&results)
return results, err
}
@@ -245,7 +268,9 @@ func (ms *MongoStorage) GetTpUsers(tp *TpUser) ([]TpUser, error) {
filter["username"] = tp.UserName
}
var results []TpUser
err := ms.db.C(utils.TBL_TP_USERS).Find(filter).All(&results)
session, col := ms.conn(utils.TBL_TP_USERS)
defer session.Close()
err := col.Find(filter).All(&results)
return results, err
}
@@ -270,7 +295,9 @@ func (ms *MongoStorage) GetTpAliases(tp *TpAlias) ([]TpAlias, error) {
filter["context"] = tp.Context
}
var results []TpAlias
err := ms.db.C(utils.TBL_TP_ALIASES).Find(filter).All(&results)
session, col := ms.conn(utils.TBL_TP_ALIASES)
defer session.Close()
err := col.Find(filter).All(&results)
return results, err
}
@@ -295,7 +322,9 @@ func (ms *MongoStorage) GetTpDerivedChargers(tp *TpDerivedCharger) ([]TpDerivedC
filter["loadid"] = tp.Loadid
}
var results []TpDerivedCharger
err := ms.db.C(utils.TBL_TP_DERIVED_CHARGERS).Find(filter).All(&results)
session, col := ms.conn(utils.TBL_TP_DERIVED_CHARGERS)
defer session.Close()
err := col.Find(filter).All(&results)
return results, err
}
@@ -307,7 +336,9 @@ func (ms *MongoStorage) GetTpActions(tpid, tag string) ([]TpAction, error) {
filter["tag"] = tag
}
var results []TpAction
err := ms.db.C(utils.TBL_TP_ACTIONS).Find(filter).All(&results)
session, col := ms.conn(utils.TBL_TP_ACTIONS)
defer session.Close()
err := col.Find(filter).All(&results)
return results, err
}
@@ -319,7 +350,9 @@ func (ms *MongoStorage) GetTpActionPlans(tpid, tag string) ([]TpActionPlan, erro
filter["tag"] = tag
}
var results []TpActionPlan
err := ms.db.C(utils.TBL_TP_ACTION_PLANS).Find(filter).All(&results)
session, col := ms.conn(utils.TBL_TP_ACTION_PLANS)
defer session.Close()
err := col.Find(filter).All(&results)
return results, err
}
@@ -331,7 +364,9 @@ func (ms *MongoStorage) GetTpActionTriggers(tpid, tag string) ([]TpActionTrigger
filter["tag"] = tag
}
var results []TpActionTrigger
err := ms.db.C(utils.TBL_TP_ACTION_TRIGGERS).Find(filter).All(&results)
session, col := ms.conn(utils.TBL_TP_ACTION_TRIGGERS)
defer session.Close()
err := col.Find(filter).All(&results)
return results, err
}
@@ -347,19 +382,24 @@ func (ms *MongoStorage) GetTpAccountActions(tp *TpAccountAction) ([]TpAccountAct
filter["loadid"] = tp.Loadid
}
var results []TpAccountAction
err := ms.db.C(utils.TBL_TP_ACCOUNT_ACTIONS).Find(filter).All(&results)
session, col := ms.conn(utils.TBL_TP_ACCOUNT_ACTIONS)
defer session.Close()
err := col.Find(filter).All(&results)
return results, err
}
func (ms *MongoStorage) RemTpData(table, tpid string, args map[string]string) error {
session := ms.session.Copy()
db := session.DB(ms.db)
defer session.Close()
if len(table) == 0 { // Remove tpid out of all tables
cols, err := ms.db.CollectionNames()
cols, err := db.CollectionNames()
if err != nil {
return err
}
for _, col := range cols {
if strings.HasPrefix(col, "tp_") {
if _, err := ms.db.C(col).RemoveAll(bson.M{"tpid": tpid}); err != nil {
if _, err := db.C(col).RemoveAll(bson.M{"tpid": tpid}); err != nil {
return err
}
}
@@ -371,7 +411,7 @@ func (ms *MongoStorage) RemTpData(table, tpid string, args map[string]string) er
args = make(map[string]string)
}
args["tpid"] = tpid
return ms.db.C(table).Remove(args)
return db.C(table).Remove(args)
}
func (ms *MongoStorage) SetTpTimings(tps []TpTiming) error {
@@ -379,8 +419,9 @@ func (ms *MongoStorage) SetTpTimings(tps []TpTiming) error {
return nil
}
m := make(map[string]bool)
tx := ms.db.C(utils.TBL_TP_TIMINGS).Bulk()
session, col := ms.conn(utils.TBL_TP_TIMINGS)
defer session.Close()
tx := col.Bulk()
for _, tp := range tps {
if found, _ := m[tp.Tag]; !found {
m[tp.Tag] = true
@@ -396,8 +437,9 @@ func (ms *MongoStorage) SetTpDestinations(tps []TpDestination) error {
return nil
}
m := make(map[string]bool)
tx := ms.db.C(utils.TBL_TP_DESTINATIONS).Bulk()
session, col := ms.conn(utils.TBL_TP_DESTINATIONS)
defer session.Close()
tx := col.Bulk()
for _, tp := range tps {
if found, _ := m[tp.Tag]; !found {
m[tp.Tag] = true
@@ -413,8 +455,9 @@ func (ms *MongoStorage) SetTpRates(tps []TpRate) error {
return nil
}
m := make(map[string]bool)
tx := ms.db.C(utils.TBL_TP_RATES).Bulk()
session, col := ms.conn(utils.TBL_TP_RATES)
defer session.Close()
tx := col.Bulk()
for _, tp := range tps {
if found, _ := m[tp.Tag]; !found {
m[tp.Tag] = true
@@ -430,8 +473,9 @@ func (ms *MongoStorage) SetTpDestinationRates(tps []TpDestinationRate) error {
return nil
}
m := make(map[string]bool)
tx := ms.db.C(utils.TBL_TP_DESTINATION_RATES).Bulk()
session, col := ms.conn(utils.TBL_TP_DESTINATION_RATES)
defer session.Close()
tx := col.Bulk()
for _, tp := range tps {
if found, _ := m[tp.Tag]; !found {
m[tp.Tag] = true
@@ -447,8 +491,9 @@ func (ms *MongoStorage) SetTpRatingPlans(tps []TpRatingPlan) error {
return nil
}
m := make(map[string]bool)
tx := ms.db.C(utils.TBL_TP_RATING_PLANS).Bulk()
session, col := ms.conn(utils.TBL_TP_RATING_PLANS)
defer session.Close()
tx := col.Bulk()
for _, tp := range tps {
if found, _ := m[tp.Tag]; !found {
m[tp.Tag] = true
@@ -464,8 +509,9 @@ func (ms *MongoStorage) SetTpRatingProfiles(tps []TpRatingProfile) error {
return nil
}
m := make(map[string]bool)
tx := ms.db.C(utils.TBL_TP_RATE_PROFILES).Bulk()
session, col := ms.conn(utils.TBL_TP_RATE_PROFILES)
defer session.Close()
tx := col.Bulk()
for _, tp := range tps {
if found, _ := m[tp.GetRatingProfileId()]; !found {
m[tp.GetRatingProfileId()] = true
@@ -488,8 +534,9 @@ func (ms *MongoStorage) SetTpSharedGroups(tps []TpSharedGroup) error {
return nil
}
m := make(map[string]bool)
tx := ms.db.C(utils.TBL_TP_SHARED_GROUPS).Bulk()
session, col := ms.conn(utils.TBL_TP_SHARED_GROUPS)
defer session.Close()
tx := col.Bulk()
for _, tp := range tps {
if found, _ := m[tp.Tag]; !found {
m[tp.Tag] = true
@@ -505,8 +552,9 @@ func (ms *MongoStorage) SetTpCdrStats(tps []TpCdrstat) error {
return nil
}
m := make(map[string]bool)
tx := ms.db.C(utils.TBL_TP_CDR_STATS).Bulk()
session, col := ms.conn(utils.TBL_TP_CDR_STATS)
defer session.Close()
tx := col.Bulk()
for _, tp := range tps {
if found, _ := m[tp.Tag]; !found {
m[tp.Tag] = true
@@ -522,8 +570,9 @@ func (ms *MongoStorage) SetTpUsers(tps []TpUser) error {
return nil
}
m := make(map[string]bool)
tx := ms.db.C(utils.TBL_TP_USERS).Bulk()
session, col := ms.conn(utils.TBL_TP_USERS)
defer session.Close()
tx := col.Bulk()
for _, tp := range tps {
if found, _ := m[tp.GetId()]; !found {
m[tp.GetId()] = true
@@ -543,8 +592,9 @@ func (ms *MongoStorage) SetTpAliases(tps []TpAlias) error {
return nil
}
m := make(map[string]bool)
tx := ms.db.C(utils.TBL_TP_ALIASES).Bulk()
session, col := ms.conn(utils.TBL_TP_ALIASES)
defer session.Close()
tx := col.Bulk()
for _, tp := range tps {
if found, _ := m[tp.GetId()]; !found {
m[tp.GetId()] = true
@@ -567,8 +617,9 @@ func (ms *MongoStorage) SetTpDerivedChargers(tps []TpDerivedCharger) error {
return nil
}
m := make(map[string]bool)
tx := ms.db.C(utils.TBL_TP_DERIVED_CHARGERS).Bulk()
session, col := ms.conn(utils.TBL_TP_DERIVED_CHARGERS)
defer session.Close()
tx := col.Bulk()
for _, tp := range tps {
if found, _ := m[tp.GetDerivedChargersId()]; !found {
m[tp.GetDerivedChargersId()] = true
@@ -590,8 +641,9 @@ func (ms *MongoStorage) SetTpLCRs(tps []TpLcrRule) error {
return nil
}
m := make(map[string]bool)
tx := ms.db.C(utils.TBL_TP_LCRS).Bulk()
session, col := ms.conn(utils.TBL_TP_LCRS)
defer session.Close()
tx := col.Bulk()
for _, tp := range tps {
if found, _ := m[tp.GetLcrRuleId()]; !found {
m[tp.GetLcrRuleId()] = true
@@ -613,8 +665,9 @@ func (ms *MongoStorage) SetTpActions(tps []TpAction) error {
return nil
}
m := make(map[string]bool)
tx := ms.db.C(utils.TBL_TP_ACTIONS).Bulk()
session, col := ms.conn(utils.TBL_TP_ACTIONS)
defer session.Close()
tx := col.Bulk()
for _, tp := range tps {
if found, _ := m[tp.Tag]; !found {
m[tp.Tag] = true
@@ -630,8 +683,9 @@ func (ms *MongoStorage) SetTpActionPlans(tps []TpActionPlan) error {
return nil
}
m := make(map[string]bool)
tx := ms.db.C(utils.TBL_TP_ACTION_PLANS).Bulk()
session, col := ms.conn(utils.TBL_TP_ACTION_PLANS)
defer session.Close()
tx := col.Bulk()
for _, tp := range tps {
if found, _ := m[tp.Tag]; !found {
m[tp.Tag] = true
@@ -647,8 +701,9 @@ func (ms *MongoStorage) SetTpActionTriggers(tps []TpActionTrigger) error {
return nil
}
m := make(map[string]bool)
tx := ms.db.C(utils.TBL_TP_ACTION_TRIGGERS).Bulk()
session, col := ms.conn(utils.TBL_TP_ACTION_TRIGGERS)
defer session.Close()
tx := col.Bulk()
for _, tp := range tps {
if found, _ := m[tp.Tag]; !found {
m[tp.Tag] = true
@@ -664,8 +719,9 @@ func (ms *MongoStorage) SetTpAccountActions(tps []TpAccountAction) error {
return nil
}
m := make(map[string]bool)
tx := ms.db.C(utils.TBL_TP_ACCOUNT_ACTIONS).Bulk()
session, col := ms.conn(utils.TBL_TP_ACCOUNT_ACTIONS)
defer session.Close()
tx := col.Bulk()
for _, tp := range tps {
if found, _ := m[tp.GetAccountActionId()]; !found {
m[tp.GetAccountActionId()] = true
@@ -681,7 +737,9 @@ func (ms *MongoStorage) SetTpAccountActions(tps []TpAccountAction) error {
}
func (ms *MongoStorage) LogActionTrigger(ubId, source string, at *ActionTrigger, as Actions) (err error) {
return ms.db.C(colLogAtr).Insert(&struct {
session, col := ms.conn(colLogAtr)
defer session.Close()
return col.Insert(&struct {
ubId string
ActionTrigger *ActionTrigger
Actions Actions
@@ -691,7 +749,9 @@ func (ms *MongoStorage) LogActionTrigger(ubId, source string, at *ActionTrigger,
}
func (ms *MongoStorage) LogActionTiming(source string, at *ActionTiming, as Actions) (err error) {
return ms.db.C(colLogApl).Insert(&struct {
session, col := ms.conn(colLogApl)
defer session.Close()
return col.Insert(&struct {
ActionPlan *ActionTiming
Actions Actions
LogTime time.Time
@@ -700,7 +760,9 @@ func (ms *MongoStorage) LogActionTiming(source string, at *ActionTiming, as Acti
}
func (ms *MongoStorage) SetSMCost(smc *SMCost) error {
return ms.db.C(utils.TBLSMCosts).Insert(smc)
session, col := ms.conn(utils.TBLSMCosts)
defer session.Close()
return col.Insert(smc)
}
func (ms *MongoStorage) GetSMCosts(cgrid, runid, originHost, originIDPrefix string) (smcs []*SMCost, err error) {
@@ -709,7 +771,9 @@ func (ms *MongoStorage) GetSMCosts(cgrid, runid, originHost, originIDPrefix stri
filter = bson.M{OriginIDLow: bson.M{"$regex": bson.RegEx{Pattern: fmt.Sprintf("^%s", originIDPrefix)}}, OriginHostLow: originHost, RunIDLow: runid}
}
// Execute query
iter := ms.db.C(utils.TBLSMCosts).Find(filter).Iter()
session, col := ms.conn(utils.TBLSMCosts)
defer session.Close()
iter := col.Find(filter).Iter()
var smCost SMCost
for iter.Next(&smCost) {
smcs = append(smcs, &smCost)
@@ -724,10 +788,12 @@ func (ms *MongoStorage) SetCDR(cdr *CDR, allowUpdate bool) (err error) {
if cdr.OrderID == 0 {
cdr.OrderID = time.Now().UnixNano()
}
session, col := ms.conn(utils.TBL_CDRS)
defer session.Close()
if allowUpdate {
_, err = ms.db.C(utils.TBL_CDRS).Upsert(bson.M{CGRIDLow: cdr.CGRID, RunIDLow: cdr.RunID}, cdr)
_, err = col.Upsert(bson.M{CGRIDLow: cdr.CGRID, RunIDLow: cdr.RunID}, cdr)
} else {
err = ms.db.C(utils.TBL_CDRS).Insert(cdr)
err = col.Insert(cdr)
}
return err
}
@@ -764,7 +830,7 @@ func (ms *MongoStorage) cleanEmptyFilters(filters bson.M) {
}
}
// _, err := ms.db.C(utils.TBL_CDRS).UpdateAll(bson.M{CGRIDLow: bson.M{"$in": cgrIds}}, bson.M{"$set": bson.M{"deleted_at": time.Now()}})
// _, err := col(utils.TBL_CDRS).UpdateAll(bson.M{CGRIDLow: bson.M{"$in": cgrIds}}, bson.M{"$set": bson.M{"deleted_at": time.Now()}})
func (ms *MongoStorage) GetCDRs(qryFltr *utils.CDRsFilter, remove bool) ([]*CDR, int64, error) {
var minPDD, maxPDD, minUsage, maxUsage *time.Duration
if len(qryFltr.MinPDD) != 0 {
@@ -886,7 +952,8 @@ func (ms *MongoStorage) GetCDRs(qryFltr *utils.CDRsFilter, remove bool) ([]*CDR,
}
//file.WriteString(fmt.Sprintf("AFTER: %v\n", utils.ToIJSON(filters)))
//file.Close()
col := ms.db.C(utils.TBL_CDRS)
session, col := ms.conn(utils.TBL_CDRS)
defer session.Close()
if remove {
if chgd, err := col.RemoveAll(filters); err != nil {
return nil, 0, err

View File

@@ -151,7 +151,7 @@ func (rs *RedisStorage) cacheRating(dKeys, rpKeys, rpfKeys, lcrKeys, dcsKeys, ac
utils.Logger.Info("Caching all destinations")
if dKeys, err = conn.Cmd("KEYS", utils.DESTINATION_PREFIX+"*").List(); err != nil {
cache2go.RollbackTransaction()
return err
return fmt.Errorf("destinations: %s", err.Error())
}
cache2go.RemPrefixKey(utils.DESTINATION_PREFIX)
} else if len(dKeys) != 0 {
@@ -165,7 +165,7 @@ func (rs *RedisStorage) cacheRating(dKeys, rpKeys, rpfKeys, lcrKeys, dcsKeys, ac
}
if _, err = rs.GetDestination(key[len(utils.DESTINATION_PREFIX):]); err != nil {
cache2go.RollbackTransaction()
return err
return fmt.Errorf("destinations: %s", err.Error())
}
}
if len(dKeys) != 0 {
@@ -175,7 +175,7 @@ func (rs *RedisStorage) cacheRating(dKeys, rpKeys, rpfKeys, lcrKeys, dcsKeys, ac
utils.Logger.Info("Caching all rating plans")
if rpKeys, err = conn.Cmd("KEYS", utils.RATING_PLAN_PREFIX+"*").List(); err != nil {
cache2go.RollbackTransaction()
return err
return fmt.Errorf("rating plans: %s", err.Error())
}
cache2go.RemPrefixKey(utils.RATING_PLAN_PREFIX)
} else if len(rpKeys) != 0 {
@@ -185,7 +185,7 @@ func (rs *RedisStorage) cacheRating(dKeys, rpKeys, rpfKeys, lcrKeys, dcsKeys, ac
cache2go.RemKey(key)
if _, err = rs.GetRatingPlan(key[len(utils.RATING_PLAN_PREFIX):], true); err != nil {
cache2go.RollbackTransaction()
return err
return fmt.Errorf("rating plans: %s", err.Error())
}
}
if len(rpKeys) != 0 {
@@ -195,7 +195,7 @@ func (rs *RedisStorage) cacheRating(dKeys, rpKeys, rpfKeys, lcrKeys, dcsKeys, ac
utils.Logger.Info("Caching all rating profiles")
if rpfKeys, err = conn.Cmd("KEYS", utils.RATING_PROFILE_PREFIX+"*").List(); err != nil {
cache2go.RollbackTransaction()
return err
return fmt.Errorf("rating profiles: %s", err.Error())
}
cache2go.RemPrefixKey(utils.RATING_PROFILE_PREFIX)
} else if len(rpfKeys) != 0 {
@@ -205,7 +205,7 @@ func (rs *RedisStorage) cacheRating(dKeys, rpKeys, rpfKeys, lcrKeys, dcsKeys, ac
cache2go.RemKey(key)
if _, err = rs.GetRatingProfile(key[len(utils.RATING_PROFILE_PREFIX):], true); err != nil {
cache2go.RollbackTransaction()
return err
return fmt.Errorf("rating profiles: %s", err.Error())
}
}
if len(rpfKeys) != 0 {
@@ -215,7 +215,7 @@ func (rs *RedisStorage) cacheRating(dKeys, rpKeys, rpfKeys, lcrKeys, dcsKeys, ac
utils.Logger.Info("Caching LCR rules.")
if lcrKeys, err = conn.Cmd("KEYS", utils.LCR_PREFIX+"*").List(); err != nil {
cache2go.RollbackTransaction()
return err
return fmt.Errorf("lcr rules: %s", err.Error())
}
cache2go.RemPrefixKey(utils.LCR_PREFIX)
} else if len(lcrKeys) != 0 {
@@ -225,7 +225,7 @@ func (rs *RedisStorage) cacheRating(dKeys, rpKeys, rpfKeys, lcrKeys, dcsKeys, ac
cache2go.RemKey(key)
if _, err = rs.GetLCR(key[len(utils.LCR_PREFIX):], true); err != nil {
cache2go.RollbackTransaction()
return err
return fmt.Errorf("lcr rules: %s", err.Error())
}
}
if len(lcrKeys) != 0 {
@@ -236,7 +236,7 @@ func (rs *RedisStorage) cacheRating(dKeys, rpKeys, rpfKeys, lcrKeys, dcsKeys, ac
utils.Logger.Info("Caching all derived chargers")
if dcsKeys, err = conn.Cmd("KEYS", utils.DERIVEDCHARGERS_PREFIX+"*").List(); err != nil {
cache2go.RollbackTransaction()
return err
return fmt.Errorf("derived chargers: %s", err.Error())
}
cache2go.RemPrefixKey(utils.DERIVEDCHARGERS_PREFIX)
} else if len(dcsKeys) != 0 {
@@ -246,7 +246,7 @@ func (rs *RedisStorage) cacheRating(dKeys, rpKeys, rpfKeys, lcrKeys, dcsKeys, ac
cache2go.RemKey(key)
if _, err = rs.GetDerivedChargers(key[len(utils.DERIVEDCHARGERS_PREFIX):], true); err != nil {
cache2go.RollbackTransaction()
return err
return fmt.Errorf("derived chargers: %s", err.Error())
}
}
if len(dcsKeys) != 0 {
@@ -256,7 +256,7 @@ func (rs *RedisStorage) cacheRating(dKeys, rpKeys, rpfKeys, lcrKeys, dcsKeys, ac
utils.Logger.Info("Caching all actions")
if actKeys, err = conn.Cmd("KEYS", utils.ACTION_PREFIX+"*").List(); err != nil {
cache2go.RollbackTransaction()
return err
return fmt.Errorf("actions: %s", err.Error())
}
cache2go.RemPrefixKey(utils.ACTION_PREFIX)
} else if len(actKeys) != 0 {
@@ -266,7 +266,7 @@ func (rs *RedisStorage) cacheRating(dKeys, rpKeys, rpfKeys, lcrKeys, dcsKeys, ac
cache2go.RemKey(key)
if _, err = rs.GetActions(key[len(utils.ACTION_PREFIX):], true); err != nil {
cache2go.RollbackTransaction()
return err
return fmt.Errorf("actions: %s", err.Error())
}
}
if len(actKeys) != 0 {
@@ -277,7 +277,7 @@ func (rs *RedisStorage) cacheRating(dKeys, rpKeys, rpfKeys, lcrKeys, dcsKeys, ac
utils.Logger.Info("Caching all action plans")
if aplKeys, err = rs.db.Cmd("KEYS", utils.ACTION_PLAN_PREFIX+"*").List(); err != nil {
cache2go.RollbackTransaction()
return err
return fmt.Errorf(" %s", err.Error())
}
cache2go.RemPrefixKey(utils.ACTION_PLAN_PREFIX)
} else if len(aplKeys) != 0 {
@@ -287,7 +287,7 @@ func (rs *RedisStorage) cacheRating(dKeys, rpKeys, rpfKeys, lcrKeys, dcsKeys, ac
cache2go.RemKey(key)
if _, err = rs.GetActionPlan(key[len(utils.ACTION_PLAN_PREFIX):], true); err != nil {
cache2go.RollbackTransaction()
return err
return fmt.Errorf(" %s", err.Error())
}
}
if len(aplKeys) != 0 {
@@ -298,7 +298,7 @@ func (rs *RedisStorage) cacheRating(dKeys, rpKeys, rpfKeys, lcrKeys, dcsKeys, ac
utils.Logger.Info("Caching all shared groups")
if shgKeys, err = conn.Cmd("KEYS", utils.SHARED_GROUP_PREFIX+"*").List(); err != nil {
cache2go.RollbackTransaction()
return err
return fmt.Errorf("shared groups: %s", err.Error())
}
cache2go.RemPrefixKey(utils.SHARED_GROUP_PREFIX)
} else if len(shgKeys) != 0 {
@@ -308,7 +308,7 @@ func (rs *RedisStorage) cacheRating(dKeys, rpKeys, rpfKeys, lcrKeys, dcsKeys, ac
cache2go.RemKey(key)
if _, err = rs.GetSharedGroup(key[len(utils.SHARED_GROUP_PREFIX):], true); err != nil {
cache2go.RollbackTransaction()
return err
return fmt.Errorf("shared groups: %s", err.Error())
}
}
if len(shgKeys) != 0 {
@@ -360,7 +360,7 @@ func (rs *RedisStorage) cacheAccounting(alsKeys []string) (err error) {
utils.Logger.Info("Caching all aliases")
if alsKeys, err = conn.Cmd("KEYS", utils.ALIASES_PREFIX+"*").List(); err != nil {
cache2go.RollbackTransaction()
return err
return fmt.Errorf("aliases: %s", err.Error())
}
cache2go.RemPrefixKey(utils.ALIASES_PREFIX)
cache2go.RemPrefixKey(utils.REVERSE_ALIASES_PREFIX)
@@ -379,7 +379,7 @@ func (rs *RedisStorage) cacheAccounting(alsKeys []string) (err error) {
cache2go.RemKey(key)
if _, err = rs.GetAlias(key[len(utils.ALIASES_PREFIX):], true); err != nil {
cache2go.RollbackTransaction()
return err
return fmt.Errorf("aliases: %s", err.Error())
}
}
if len(alsKeys) != 0 {