Merge pull request #1816 from Trial97/master

Updated MongoStorage
This commit is contained in:
Dan Christian Bogos
2019-12-12 11:17:39 +01:00
committed by GitHub
6 changed files with 67 additions and 75 deletions

View File

@@ -165,7 +165,7 @@ const CGRATES_CFG_JSON = `
"*timings": {"limit": -1, "ttl": "", "static_ttl": false, "precache": false}, // timings caching
"*resource_profiles": {"limit": -1, "ttl": "", "static_ttl": false, "precache": false}, // control resource profiles caching
"*resources": {"limit": -1, "ttl": "", "static_ttl": false, "precache": false}, // control resources caching
"*event_resources": {"limit": -1, "ttl": "", "static_ttl": false, "precache": false}, // matching resources to events
"*event_resources": {"limit": -1, "ttl": "", "static_ttl": false}, // matching resources to events
"*statqueue_profiles": {"limit": -1, "ttl": "", "static_ttl": false, "precache": false}, // statqueue profiles
"*statqueues": {"limit": -1, "ttl": "", "static_ttl": false, "precache": false}, // statqueues with metrics
"*threshold_profiles": {"limit": -1, "ttl": "", "static_ttl": false, "precache": false}, // control threshold profiles caching
@@ -183,13 +183,13 @@ const CGRATES_CFG_JSON = `
"*attribute_filter_indexes" : {"limit": -1, "ttl": "", "static_ttl": false}, // control attribute filter indexes caching
"*charger_filter_indexes" : {"limit": -1, "ttl": "", "static_ttl": false}, // control charger filter indexes caching
"*dispatcher_filter_indexes" : {"limit": -1, "ttl": "", "static_ttl": false}, // control dispatcher filter indexes caching
"*dispatcher_routes": {"limit": -1, "ttl": "", "static_ttl": false, "precache": false}, // control dispatcher routes caching
"*diameter_messages": {"limit": -1, "ttl": "3h", "static_ttl": false, "precache": false}, // diameter messages caching
"*rpc_responses": {"limit": 0, "ttl": "2s", "static_ttl": false, "precache": false}, // RPC responses caching
"*closed_sessions": {"limit": -1, "ttl": "10s", "static_ttl": false, "precache": false}, // closed sessions cached for CDRs
"*cdr_ids": {"limit": -1, "ttl": "", "static_ttl": false, "precache": false}, // protects CDRs against double-charging
"*dispatcher_routes": {"limit": -1, "ttl": "", "static_ttl": false}, // control dispatcher routes caching
"*diameter_messages": {"limit": -1, "ttl": "3h", "static_ttl": false}, // diameter messages caching
"*rpc_responses": {"limit": 0, "ttl": "2s", "static_ttl": false}, // RPC responses caching
"*closed_sessions": {"limit": -1, "ttl": "10s", "static_ttl": false}, // closed sessions cached for CDRs
"*cdr_ids": {"limit": -1, "ttl": "", "static_ttl": false}, // protects CDRs against double-charging
"*load_ids": {"limit": -1, "ttl": "", "static_ttl": false, "precache": false}, // control the load_ids for items
"*rpc_connections": {"limit": -1, "ttl": "", "static_ttl": false, "precache": false}, // RPC connections caching
"*rpc_connections": {"limit": -1, "ttl": "", "static_ttl": false}, // RPC connections caching
},

View File

@@ -106,8 +106,7 @@ func TestCacheJsonCfg(t *testing.T) {
Ttl: utils.StringPointer(""), Static_ttl: utils.BoolPointer(false),
Precache: utils.BoolPointer(false)},
utils.CacheEventResources: &CacheParamJsonCfg{Limit: utils.IntPointer(-1),
Ttl: utils.StringPointer(""), Static_ttl: utils.BoolPointer(false),
Precache: utils.BoolPointer(false)},
Ttl: utils.StringPointer(""), Static_ttl: utils.BoolPointer(false)},
utils.CacheStatQueueProfiles: &CacheParamJsonCfg{Limit: utils.IntPointer(-1),
Ttl: utils.StringPointer(""), Static_ttl: utils.BoolPointer(false),
Precache: utils.BoolPointer(false)},
@@ -153,26 +152,20 @@ func TestCacheJsonCfg(t *testing.T) {
utils.CacheDispatcherFilterIndexes: &CacheParamJsonCfg{Limit: utils.IntPointer(-1),
Ttl: utils.StringPointer(""), Static_ttl: utils.BoolPointer(false)},
utils.CacheDispatcherRoutes: &CacheParamJsonCfg{Limit: utils.IntPointer(-1),
Ttl: utils.StringPointer(""), Static_ttl: utils.BoolPointer(false),
Precache: utils.BoolPointer(false)},
Ttl: utils.StringPointer(""), Static_ttl: utils.BoolPointer(false)},
utils.CacheDiameterMessages: &CacheParamJsonCfg{Limit: utils.IntPointer(-1),
Ttl: utils.StringPointer("3h"), Static_ttl: utils.BoolPointer(false),
Precache: utils.BoolPointer(false)},
Ttl: utils.StringPointer("3h"), Static_ttl: utils.BoolPointer(false)},
utils.CacheRPCResponses: &CacheParamJsonCfg{Limit: utils.IntPointer(0),
Ttl: utils.StringPointer("2s"), Static_ttl: utils.BoolPointer(false),
Precache: utils.BoolPointer(false)},
Ttl: utils.StringPointer("2s"), Static_ttl: utils.BoolPointer(false)},
utils.CacheClosedSessions: &CacheParamJsonCfg{Limit: utils.IntPointer(-1),
Ttl: utils.StringPointer("10s"), Static_ttl: utils.BoolPointer(false),
Precache: utils.BoolPointer(false)},
Ttl: utils.StringPointer("10s"), Static_ttl: utils.BoolPointer(false)},
utils.CacheCDRIDs: &CacheParamJsonCfg{Limit: utils.IntPointer(-1),
Ttl: utils.StringPointer(""), Static_ttl: utils.BoolPointer(false),
Precache: utils.BoolPointer(false)},
Ttl: utils.StringPointer(""), Static_ttl: utils.BoolPointer(false)},
utils.CacheLoadIDs: &CacheParamJsonCfg{Limit: utils.IntPointer(-1),
Ttl: utils.StringPointer(""), Static_ttl: utils.BoolPointer(false),
Precache: utils.BoolPointer(false)},
utils.CacheRPCConnections: &CacheParamJsonCfg{Limit: utils.IntPointer(-1),
Ttl: utils.StringPointer(""), Static_ttl: utils.BoolPointer(false),
Precache: utils.BoolPointer(false)},
Ttl: utils.StringPointer(""), Static_ttl: utils.BoolPointer(false)},
}
if gCfg, err := dfCgrJsonCfg.CacheJsonCfg(); err != nil {

View File

@@ -144,7 +144,7 @@
// "*timings": {"limit": -1, "ttl": "", "static_ttl": false, "precache": false}, // timings caching
// "*resource_profiles": {"limit": -1, "ttl": "", "static_ttl": false, "precache": false}, // control resource profiles caching
// "*resources": {"limit": -1, "ttl": "", "static_ttl": false, "precache": false}, // control resources caching
// "*event_resources": {"limit": -1, "ttl": "", "static_ttl": false, "precache": false}, // matching resources to events
// "*event_resources": {"limit": -1, "ttl": "", "static_ttl": false}, // matching resources to events
// "*statqueue_profiles": {"limit": -1, "ttl": "", "static_ttl": false, "precache": false}, // statqueue profiles
// "*statqueues": {"limit": -1, "ttl": "", "static_ttl": false, "precache": false}, // statqueues with metrics
// "*threshold_profiles": {"limit": -1, "ttl": "", "static_ttl": false, "precache": false}, // control threshold profiles caching
@@ -162,13 +162,13 @@
// "*attribute_filter_indexes" : {"limit": -1, "ttl": "", "static_ttl": false}, // control attribute filter indexes caching
// "*charger_filter_indexes" : {"limit": -1, "ttl": "", "static_ttl": false}, // control charger filter indexes caching
// "*dispatcher_filter_indexes" : {"limit": -1, "ttl": "", "static_ttl": false}, // control dispatcher filter indexes caching
// "*dispatcher_routes": {"limit": -1, "ttl": "", "static_ttl": false, "precache": false}, // control dispatcher routes caching
// "*diameter_messages": {"limit": -1, "ttl": "3h", "static_ttl": false, "precache": false}, // diameter messages caching
// "*rpc_responses": {"limit": 0, "ttl": "2s", "static_ttl": false, "precache": false}, // RPC responses caching
// "*closed_sessions": {"limit": -1, "ttl": "10s", "static_ttl": false, "precache": false}, // closed sessions cached for CDRs
// "*cdr_ids": {"limit": -1, "ttl": "", "static_ttl": false, "precache": false}, // protects CDRs against double-charging
// "*dispatcher_routes": {"limit": -1, "ttl": "", "static_ttl": false}, // control dispatcher routes caching
// "*diameter_messages": {"limit": -1, "ttl": "3h", "static_ttl": false}, // diameter messages caching
// "*rpc_responses": {"limit": 0, "ttl": "2s", "static_ttl": false}, // RPC responses caching
// "*closed_sessions": {"limit": -1, "ttl": "10s", "static_ttl": false}, // closed sessions cached for CDRs
// "*cdr_ids": {"limit": -1, "ttl": "", "static_ttl": false}, // protects CDRs against double-charging
// "*load_ids": {"limit": -1, "ttl": "", "static_ttl": false, "precache": false}, // control the load_ids for items
// "*rpc_connections": {"limit": -1, "ttl": "", "static_ttl": false, "precache": false}, // RPC connections caching
// "*rpc_connections": {"limit": -1, "ttl": "", "static_ttl": false}, // RPC connections caching
// },

View File

@@ -473,46 +473,46 @@ func TestCDRParseFieldValue(t *testing.T) {
}
}
func TestCdrClone(t * testing.T){
func TestCdrClone(t *testing.T) {
cdr := &CDR{}
eOut := &CDR{}
if rcv := cdr.Clone(); !reflect.DeepEqual(rcv,eOut) {
t.Errorf("Expecting: %+v, received: %+v",eOut,rcv)
if rcv := cdr.Clone(); !reflect.DeepEqual(rcv, eOut) {
t.Errorf("Expecting: %+v, received: %+v", eOut, rcv)
}
cdr = &CDR{
CGRID: "CGRID_test",
OrderID: 18,
CGRID: "CGRID_test",
OrderID: 18,
SetupTime: time.Date(2020, time.April, 18, 23, 0, 4, 0, time.UTC),
Usage: time.Duration(10),
Usage: time.Duration(10),
ExtraFields: map[string]string{
"test1":"_test1_",
"test2":"_test2_",
"test1": "_test1_",
"test2": "_test2_",
},
Partial: true,
Cost: 0.74,
Cost: 0.74,
CostDetails: &EventCost{
CGRID: "EventCost_CGRID",
Cost: utils.Float64Pointer(0.74),
Cost: utils.Float64Pointer(0.74),
},
}
eOut = &CDR{
CGRID: "CGRID_test",
OrderID: 18,
CGRID: "CGRID_test",
OrderID: 18,
SetupTime: time.Date(2020, time.April, 18, 23, 0, 4, 0, time.UTC),
Usage: time.Duration(10),
Usage: time.Duration(10),
ExtraFields: map[string]string{
"test1":"_test1_",
"test2":"_test2_",
"test1": "_test1_",
"test2": "_test2_",
},
Partial: true,
Cost: 0.74,
Cost: 0.74,
CostDetails: &EventCost{
CGRID: "EventCost_CGRID",
Cost: utils.Float64Pointer(0.74),
Cost: utils.Float64Pointer(0.74),
},
}
if rcv := cdr.Clone(); !reflect.DeepEqual(rcv,eOut) {
t.Errorf("Expecting: %+v,\n received: %+v",eOut,rcv)
if rcv := cdr.Clone(); !reflect.DeepEqual(rcv, eOut) {
t.Errorf("Expecting: %+v,\n received: %+v", eOut, rcv)
}
}

View File

@@ -192,13 +192,13 @@ func (ec *EventCost) Clone() (cln *EventCost) {
if ec.AccountSummary != nil {
cln.AccountSummary = ec.AccountSummary.Clone()
}
if ec.Rating != nil {
if ec.Rating != nil {
cln.Rating = ec.Rating.Clone()
}
if ec.Accounting != nil {
if ec.Accounting != nil {
cln.Accounting = ec.Accounting.Clone()
}
if ec.RatingFilters != nil {
if ec.RatingFilters != nil {
cln.RatingFilters = ec.RatingFilters.Clone()
}
if ec.Rates != nil {

View File

@@ -173,27 +173,19 @@ func NewMongoStorage(host, port, db, user, pass, storageType string, cdrsIndexes
}
if err = ms.query(func(sctx mongo.SessionContext) error {
col, err := ms.client.Database(dbName).ListCollections(sctx, bson.D{}, options.ListCollections().SetNameOnly(true))
cols, err := ms.client.Database(dbName).ListCollectionNames(sctx, bson.D{})
if err != nil {
return err
}
empty := true
for col.Next(sctx) { // create indexes only if database is empty or only version table is present
var elem struct{ Name string }
err := col.Decode(&elem)
if err != nil {
return err
}
if elem.Name != ColVer {
for _, col := range cols { // create indexes only if database is empty or only version table is present
if col != ColVer {
empty = false
break
}
}
col.Close(sctx)
if empty {
if err = ms.EnsureIndexes(); err != nil {
return err
}
return ms.EnsureIndexes()
}
return nil
}); err != nil {
@@ -236,9 +228,9 @@ func (ms *MongoStorage) enusureIndex(colName string, uniq bool, keys ...string)
return ms.query(func(sctx mongo.SessionContext) error {
col := ms.getCol(colName)
io := options.Index().SetUnique(uniq)
var doc bsonx.Doc
doc := make(bson.M)
for _, k := range keys {
doc = doc.Append(k, bsonx.Int32(1))
doc[k] = 1
}
_, err := col.Indexes().CreateOne(sctx, mongo.IndexModel{
Keys: doc,
@@ -260,6 +252,7 @@ func (ms *MongoStorage) getCol(col string) *mongo.Collection {
return ms.client.Database(ms.db).Collection(col)
}
// GetContext returns the context used for the current DB
func (ms *MongoStorage) GetContext() context.Context {
return ms.ctx
}
@@ -406,7 +399,10 @@ func (ms *MongoStorage) Close() {
// Flush drops the datatable
func (ms *MongoStorage) Flush(ignore string) (err error) {
return ms.query(func(sctx mongo.SessionContext) error {
return ms.client.Database(ms.db).Drop(sctx)
if err = ms.client.Database(ms.db).Drop(sctx); err != nil {
return err
}
return ms.EnsureIndexes() // recreate the indexes
})
}
@@ -526,20 +522,23 @@ func (ms *MongoStorage) RemoveReverseForPrefix(prefix string) (err error) {
// IsDBEmpty implementation
func (ms *MongoStorage) IsDBEmpty() (resp bool, err error) {
err = ms.query(func(sctx mongo.SessionContext) error {
col, err := ms.DB().ListCollections(sctx, bson.D{})
cols, err := ms.DB().ListCollectionNames(sctx, bson.D{})
if err != nil {
return err
}
if resp = !col.Next(sctx); resp {
return nil
for _, col := range cols {
if col == utils.CDRsTBL { // ignore cdrs collection
continue
}
var count int64
if count, err = ms.getCol(col).CountDocuments(sctx, bson.D{}, options.Count().SetLimit(1)); err != nil { // check if collection is empty so limit the count to 1
return err
}
if count != 0 {
return nil
}
}
elem := bson.D{}
err = col.Decode(&elem)
if err != nil {
return err
}
resp = (elem.Map()["name"] == "cdrs")
col.Close(sctx)
resp = true
return nil
})
return resp, err