mirror of
https://github.com/cgrates/cgrates.git
synced 2026-02-11 18:16:24 +05:00
Timings in DataDB
This commit is contained in:
@@ -64,6 +64,7 @@ type CacheConfig struct {
|
||||
ReverseAliases *CacheParamConfig
|
||||
DerivedChargers *CacheParamConfig
|
||||
ResourceLimits *CacheParamConfig
|
||||
Timings *CacheParamConfig
|
||||
}
|
||||
|
||||
func (self *CacheConfig) loadFromJsonCfg(jsnCfg *CacheJsonCfg) error {
|
||||
|
||||
@@ -65,6 +65,7 @@ const CGRATES_CFG_JSON = `
|
||||
"reverse_aliases": {"limit": 10000, "ttl":"0s", "precache": false}, // control reverse aliases index caching
|
||||
"derived_chargers": {"limit": 10000, "ttl":"0s", "precache": false}, // control derived charging rule caching
|
||||
"resource_limits": {"limit": 10000, "ttl":"0s", "precache": false}, // control resource limits caching
|
||||
"timings": {"limit": 10000, "ttl":"0s", "precache": false}, // control timings caching
|
||||
},
|
||||
|
||||
|
||||
@@ -362,7 +363,7 @@ const CGRATES_CFG_JSON = `
|
||||
"listen_acct": "127.0.0.1:1813", // address where to listen for radius accounting requests <x.y.z.y:1234>
|
||||
"client_secrets": { // hash containing secrets for clients connecting here <*default|$client_ip>
|
||||
"*default": "CGRateS.org"
|
||||
},
|
||||
},
|
||||
"client_dictionaries": { // per client path towards directory holding additional dictionaries to load (extra to RFC)
|
||||
"*default": "/usr/share/cgrates/radius/dict/", // key represents the client IP or catch-all <*default|$client_ip>
|
||||
},
|
||||
|
||||
@@ -61,6 +61,7 @@ var sTestsOnStorIT = []func(t *testing.T){
|
||||
testOnStorITCacheAlias,
|
||||
testOnStorITCacheReverseAlias,
|
||||
testOnStorITCacheResourceLimit,
|
||||
testOnStorITCacheTiming,
|
||||
// ToDo: test cache flush for a prefix
|
||||
// ToDo: testOnStorITLoadAccountingCache
|
||||
testOnStorITHasData,
|
||||
@@ -83,6 +84,7 @@ var sTestsOnStorIT = []func(t *testing.T){
|
||||
testOnStorITCRUDAlias,
|
||||
testOnStorITCRUDReverseAlias,
|
||||
testOnStorITCRUDResourceLimit,
|
||||
testOnStorITCRUDTiming,
|
||||
testOnStorITCRUDHistory,
|
||||
testOnStorITCRUDStructVersion,
|
||||
}
|
||||
@@ -759,6 +761,33 @@ func testOnStorITCacheResourceLimit(t *testing.T) {
|
||||
}
|
||||
}
|
||||
|
||||
func testOnStorITCacheTiming(t *testing.T) {
|
||||
tmg := &utils.TPTiming{
|
||||
ID: "TEST_TMG",
|
||||
Years: utils.Years{2016, 2017},
|
||||
Months: utils.Months{time.January, time.February, time.March},
|
||||
MonthDays: utils.MonthDays{1, 2, 3, 4},
|
||||
WeekDays: utils.WeekDays{},
|
||||
StartTime: "00:00:00",
|
||||
EndTime: "",
|
||||
}
|
||||
|
||||
if err := onStor.SetTiming(tmg, utils.NonTransactional); err != nil {
|
||||
t.Error(err)
|
||||
}
|
||||
if _, hasIt := cache.Get(utils.TimingsPrefix + tmg.ID); hasIt {
|
||||
t.Error("Already in cache")
|
||||
}
|
||||
if err := onStor.CacheDataFromDB(utils.TimingsPrefix, []string{tmg.ID}, false); err != nil {
|
||||
t.Error(err)
|
||||
}
|
||||
if itm, hasIt := cache.Get(utils.TimingsPrefix + tmg.ID); !hasIt {
|
||||
t.Error("Did not cache")
|
||||
} else if rcv := itm.(*utils.TPTiming); !reflect.DeepEqual(tmg, rcv) {
|
||||
t.Errorf("Expecting: %+v, received: %+v", tmg, rcv)
|
||||
}
|
||||
}
|
||||
|
||||
func testOnStorITHasData(t *testing.T) {
|
||||
rp := &RatingPlan{
|
||||
Id: "HasData",
|
||||
@@ -1691,6 +1720,51 @@ func testOnStorITCRUDResourceLimit(t *testing.T) {
|
||||
}
|
||||
}
|
||||
|
||||
func testOnStorITCRUDTiming(t *testing.T) {
|
||||
tmg := &utils.TPTiming{
|
||||
ID: "TEST",
|
||||
Years: utils.Years{2016, 2017},
|
||||
Months: utils.Months{time.January, time.February, time.March},
|
||||
MonthDays: utils.MonthDays{1, 2, 3, 4},
|
||||
WeekDays: utils.WeekDays{},
|
||||
StartTime: "00:00:00",
|
||||
EndTime: "",
|
||||
}
|
||||
if _, rcvErr := onStor.GetTiming(tmg.ID, true, utils.NonTransactional); rcvErr != utils.ErrNotFound {
|
||||
t.Error(rcvErr)
|
||||
}
|
||||
if err := onStor.SetTiming(tmg, utils.NonTransactional); err != nil {
|
||||
t.Error(err)
|
||||
}
|
||||
if rcv, err := onStor.GetTiming(tmg.ID, true, utils.NonTransactional); err != nil {
|
||||
t.Error(err)
|
||||
} else if !reflect.DeepEqual(tmg, rcv) {
|
||||
t.Errorf("Expecting: %v, received: %v", tmg, rcv)
|
||||
}
|
||||
// FixMe
|
||||
// if err = onStor.SelectDatabase("13"); err != nil {
|
||||
// t.Error(err)
|
||||
// }
|
||||
// if _, rcvErr := onStor.GetTiming(tmg.ID, false, utils.NonTransactional); rcvErr != utils.ErrNotFound {
|
||||
// t.Error(rcvErr)
|
||||
// }
|
||||
//
|
||||
if rcv, err := onStor.GetTiming(tmg.ID, false, utils.NonTransactional); err != nil {
|
||||
t.Error(err)
|
||||
} else if !reflect.DeepEqual(tmg, rcv) {
|
||||
t.Errorf("Expecting: %v, received: %v", tmg, rcv)
|
||||
}
|
||||
// if err = onStor.SelectDatabase(onStorCfg); err != nil {
|
||||
// t.Error(err)
|
||||
// }
|
||||
if err := onStor.RemoveTiming(tmg.ID, utils.NonTransactional); err != nil {
|
||||
t.Error(err)
|
||||
}
|
||||
if _, rcvErr := onStor.GetTiming(tmg.ID, true, utils.NonTransactional); rcvErr != utils.ErrNotFound {
|
||||
t.Error(rcvErr)
|
||||
}
|
||||
}
|
||||
|
||||
func testOnStorITCRUDHistory(t *testing.T) {
|
||||
time := time.Date(2012, 1, 1, 0, 0, 0, 0, time.UTC).Local()
|
||||
ist := &utils.LoadInstance{"Load", "RatingLoad", "Account", time}
|
||||
@@ -1724,6 +1798,7 @@ func testOnStorITCRUDStructVersion(t *testing.T) {
|
||||
Cdrs: "1",
|
||||
SMCosts: "1",
|
||||
ResourceLimits: "1",
|
||||
Timings: "1",
|
||||
}
|
||||
if _, rcvErr := onStor.GetStructVersion(); rcvErr != utils.ErrNotFound {
|
||||
t.Error(rcvErr)
|
||||
|
||||
@@ -100,6 +100,9 @@ type DataDB interface {
|
||||
GetResourceLimit(string, bool, string) (*ResourceLimit, error)
|
||||
SetResourceLimit(*ResourceLimit, string) error
|
||||
RemoveResourceLimit(string, string) error
|
||||
GetTiming(string, bool, string) (*utils.TPTiming, error)
|
||||
SetTiming(*utils.TPTiming, string) error
|
||||
RemoveTiming(string, string) error
|
||||
GetLoadHistory(int, bool, string) ([]*utils.LoadInstance, error)
|
||||
AddLoadHistory(*utils.LoadInstance, int, string) error
|
||||
GetStructVersion() (*StructVersion, error)
|
||||
|
||||
@@ -265,7 +265,8 @@ func (ms *MapStorage) CacheDataFromDB(prefix string, IDs []string, mustBeCached
|
||||
utils.LCR_PREFIX,
|
||||
utils.ALIASES_PREFIX,
|
||||
utils.REVERSE_ALIASES_PREFIX,
|
||||
utils.ResourceLimitsPrefix}, prefix) {
|
||||
utils.ResourceLimitsPrefix,
|
||||
utils.TimingsPrefix}, prefix) {
|
||||
return utils.NewCGRError(utils.REDIS,
|
||||
utils.MandatoryIEMissingCaps,
|
||||
utils.UnsupportedCachePrefix,
|
||||
@@ -317,6 +318,8 @@ func (ms *MapStorage) CacheDataFromDB(prefix string, IDs []string, mustBeCached
|
||||
nrItems = ms.cacheCfg.ReverseAliases.Limit
|
||||
case utils.ResourceLimitsPrefix:
|
||||
nrItems = ms.cacheCfg.ResourceLimits.Limit
|
||||
case utils.TimingsPrefix:
|
||||
nrItems = ms.cacheCfg.Timings.Limit
|
||||
}
|
||||
if nrItems != 0 && nrItems < len(IDs) {
|
||||
IDs = IDs[:nrItems]
|
||||
@@ -357,6 +360,8 @@ func (ms *MapStorage) CacheDataFromDB(prefix string, IDs []string, mustBeCached
|
||||
_, err = ms.GetReverseAlias(dataID, true, utils.NonTransactional)
|
||||
case utils.ResourceLimitsPrefix:
|
||||
_, err = ms.GetResourceLimit(dataID, true, utils.NonTransactional)
|
||||
case utils.TimingsPrefix:
|
||||
_, err = ms.GetTiming(dataID, true, utils.NonTransactional)
|
||||
}
|
||||
if err != nil {
|
||||
return utils.NewCGRError(utils.REDIS,
|
||||
@@ -1406,6 +1411,7 @@ func (ms *MapStorage) GetResourceLimit(id string, skipCache bool, transactionID
|
||||
cache.Set(key, rl, cacheCommit(transactionID), transactionID)
|
||||
return
|
||||
}
|
||||
|
||||
func (ms *MapStorage) SetResourceLimit(rl *ResourceLimit, transactionID string) error {
|
||||
ms.mu.Lock()
|
||||
defer ms.mu.Unlock()
|
||||
@@ -1427,6 +1433,55 @@ func (ms *MapStorage) RemoveResourceLimit(id string, transactionID string) error
|
||||
return nil
|
||||
}
|
||||
|
||||
func (ms *MapStorage) GetTiming(id string, skipCache bool, transactionID string) (t *utils.TPTiming, err error) {
|
||||
ms.mu.RLock()
|
||||
defer ms.mu.RUnlock()
|
||||
key := utils.TimingsPrefix + id
|
||||
if !skipCache {
|
||||
if x, ok := cache.Get(key); ok {
|
||||
if x != nil {
|
||||
return x.(*utils.TPTiming), nil
|
||||
}
|
||||
return nil, utils.ErrNotFound
|
||||
}
|
||||
}
|
||||
cCommit := cacheCommit(transactionID)
|
||||
if values, ok := ms.dict[key]; ok {
|
||||
t = new(utils.TPTiming)
|
||||
if err = ms.ms.Unmarshal(values, &t); err != nil {
|
||||
return nil, err
|
||||
}
|
||||
} else {
|
||||
cache.Set(key, nil, cCommit, transactionID)
|
||||
return nil, utils.ErrNotFound
|
||||
}
|
||||
cache.Set(key, t, cCommit, transactionID)
|
||||
return
|
||||
|
||||
}
|
||||
|
||||
func (ms *MapStorage) SetTiming(t *utils.TPTiming, transactionID string) error {
|
||||
fmt.Println("Map")
|
||||
ms.mu.Lock()
|
||||
defer ms.mu.Unlock()
|
||||
result, err := ms.ms.Marshal(t)
|
||||
if err != nil {
|
||||
return err
|
||||
}
|
||||
key := utils.TimingsPrefix + t.ID
|
||||
ms.dict[key] = result
|
||||
return nil
|
||||
}
|
||||
|
||||
func (ms *MapStorage) RemoveTiming(id string, transactionID string) error {
|
||||
ms.mu.Lock()
|
||||
defer ms.mu.Unlock()
|
||||
key := utils.TimingsPrefix + id
|
||||
delete(ms.dict, key)
|
||||
cache.RemKey(key, cacheCommit(transactionID), transactionID)
|
||||
return nil
|
||||
}
|
||||
|
||||
func (ms *MapStorage) GetReqFilterIndexes(dbKey string) (indexes map[string]map[string]utils.StringMap, err error) {
|
||||
ms.mu.RLock()
|
||||
defer ms.mu.RUnlock()
|
||||
|
||||
@@ -58,6 +58,7 @@ const (
|
||||
colVer = "versions"
|
||||
colRL = "resource_limits"
|
||||
colRFI = "request_filter_indexes"
|
||||
colTmg = "timings"
|
||||
)
|
||||
|
||||
var (
|
||||
@@ -322,6 +323,7 @@ func (ms *MongoStorage) getColNameForPrefix(prefix string) (name string, ok bool
|
||||
utils.LOADINST_KEY: colLht,
|
||||
utils.VERSION_PREFIX: colVer,
|
||||
utils.ResourceLimitsPrefix: colRL,
|
||||
utils.TimingsPrefix: colTmg,
|
||||
}
|
||||
name, ok = colMap[prefix]
|
||||
return
|
||||
@@ -462,7 +464,8 @@ func (ms *MongoStorage) CacheDataFromDB(prfx string, ids []string, mustBeCached
|
||||
utils.LCR_PREFIX,
|
||||
utils.ALIASES_PREFIX,
|
||||
utils.REVERSE_ALIASES_PREFIX,
|
||||
utils.ResourceLimitsPrefix}, prfx) {
|
||||
utils.ResourceLimitsPrefix,
|
||||
utils.TimingsPrefix}, prfx) {
|
||||
return utils.NewCGRError(utils.MONGO,
|
||||
utils.MandatoryIEMissingCaps,
|
||||
utils.UnsupportedCachePrefix,
|
||||
@@ -514,6 +517,8 @@ func (ms *MongoStorage) CacheDataFromDB(prfx string, ids []string, mustBeCached
|
||||
nrItems = ms.cacheCfg.ReverseAliases.Limit
|
||||
case utils.ResourceLimitsPrefix:
|
||||
nrItems = ms.cacheCfg.ResourceLimits.Limit
|
||||
case utils.TimingsPrefix:
|
||||
nrItems = ms.cacheCfg.Timings.Limit
|
||||
}
|
||||
if nrItems != 0 && nrItems < len(ids) { // More ids than cache config allows it, limit here
|
||||
ids = ids[:nrItems]
|
||||
@@ -554,6 +559,8 @@ func (ms *MongoStorage) CacheDataFromDB(prfx string, ids []string, mustBeCached
|
||||
_, err = ms.GetReverseAlias(dataID, true, utils.NonTransactional)
|
||||
case utils.ResourceLimitsPrefix:
|
||||
_, err = ms.GetResourceLimit(dataID, true, utils.NonTransactional)
|
||||
case utils.TimingsPrefix:
|
||||
_, err = ms.GetTiming(dataID, true, utils.NonTransactional)
|
||||
}
|
||||
if err != nil {
|
||||
return utils.NewCGRError(utils.MONGO,
|
||||
@@ -654,6 +661,11 @@ func (ms *MongoStorage) GetKeysForPrefix(prefix string) (result []string, err er
|
||||
for iter.Next(&idResult) {
|
||||
result = append(result, utils.AccountActionPlansPrefix+keyResult.Key)
|
||||
}
|
||||
case utils.TimingsPrefix:
|
||||
iter := db.C(colRL).Find(bson.M{"id": bson.M{"$regex": bson.RegEx{Pattern: subject}}}).Select(bson.M{"id": 1}).Iter()
|
||||
for iter.Next(&idResult) {
|
||||
result = append(result, utils.TimingsPrefix+idResult.Id)
|
||||
}
|
||||
default:
|
||||
err = fmt.Errorf("unsupported prefix in GetKeysForPrefix: %s", prefix)
|
||||
}
|
||||
@@ -1897,6 +1909,48 @@ func (ms *MongoStorage) RemoveResourceLimit(id string, transactionID string) (er
|
||||
return nil
|
||||
}
|
||||
|
||||
func (ms *MongoStorage) GetTiming(id string, skipCache bool, transactionID string) (t *utils.TPTiming, err error) {
|
||||
key := utils.TimingsPrefix + id
|
||||
if !skipCache {
|
||||
if x, ok := cache.Get(key); ok {
|
||||
if x == nil {
|
||||
return nil, utils.ErrNotFound
|
||||
}
|
||||
return x.(*utils.TPTiming), nil
|
||||
}
|
||||
}
|
||||
session, col := ms.conn(colTmg)
|
||||
defer session.Close()
|
||||
t = new(utils.TPTiming)
|
||||
if err = col.Find(bson.M{"id": id}).One(t); err != nil {
|
||||
if err == mgo.ErrNotFound {
|
||||
err = utils.ErrNotFound
|
||||
cache.Set(key, nil, cacheCommit(transactionID), transactionID)
|
||||
}
|
||||
return nil, err
|
||||
}
|
||||
cache.Set(key, t, cacheCommit(transactionID), transactionID)
|
||||
return
|
||||
}
|
||||
|
||||
func (ms *MongoStorage) SetTiming(t *utils.TPTiming, transactionID string) (err error) {
|
||||
fmt.Println("Mongo")
|
||||
session, col := ms.conn(colTmg)
|
||||
defer session.Close()
|
||||
_, err = col.Upsert(bson.M{"id": t.ID}, t)
|
||||
return
|
||||
}
|
||||
|
||||
func (ms *MongoStorage) RemoveTiming(id string, transactionID string) (err error) {
|
||||
session, col := ms.conn(colTmg)
|
||||
defer session.Close()
|
||||
if err = col.Remove(bson.M{"id": id}); err != nil {
|
||||
return
|
||||
}
|
||||
cache.RemKey(utils.TimingsPrefix+id, cacheCommit(transactionID), transactionID)
|
||||
return nil
|
||||
}
|
||||
|
||||
func (ms *MongoStorage) GetReqFilterIndexes(dbKey string) (indexes map[string]map[string]utils.StringMap, err error) {
|
||||
session, col := ms.conn(colRFI)
|
||||
defer session.Close()
|
||||
|
||||
@@ -229,7 +229,8 @@ func (rs *RedisStorage) CacheDataFromDB(prfx string, ids []string, mustBeCached
|
||||
utils.LCR_PREFIX,
|
||||
utils.ALIASES_PREFIX,
|
||||
utils.REVERSE_ALIASES_PREFIX,
|
||||
utils.ResourceLimitsPrefix}, prfx) {
|
||||
utils.ResourceLimitsPrefix,
|
||||
utils.TimingsPrefix}, prfx) {
|
||||
return utils.NewCGRError(utils.REDIS,
|
||||
utils.MandatoryIEMissingCaps,
|
||||
utils.UnsupportedCachePrefix,
|
||||
@@ -281,6 +282,8 @@ func (rs *RedisStorage) CacheDataFromDB(prfx string, ids []string, mustBeCached
|
||||
nrItems = rs.cacheCfg.ReverseAliases.Limit
|
||||
case utils.ResourceLimitsPrefix:
|
||||
nrItems = rs.cacheCfg.ResourceLimits.Limit
|
||||
case utils.TimingsPrefix:
|
||||
nrItems = rs.cacheCfg.Timings.Limit
|
||||
}
|
||||
if nrItems != 0 && nrItems < len(ids) {
|
||||
ids = ids[:nrItems]
|
||||
@@ -321,6 +324,8 @@ func (rs *RedisStorage) CacheDataFromDB(prfx string, ids []string, mustBeCached
|
||||
_, err = rs.GetReverseAlias(dataID, true, utils.NonTransactional)
|
||||
case utils.ResourceLimitsPrefix:
|
||||
_, err = rs.GetResourceLimit(dataID, true, utils.NonTransactional)
|
||||
case utils.TimingsPrefix:
|
||||
_, err = rs.GetTiming(dataID, true, utils.NonTransactional)
|
||||
}
|
||||
if err != nil {
|
||||
return utils.NewCGRError(utils.REDIS,
|
||||
@@ -1439,6 +1444,49 @@ func (rs *RedisStorage) RemoveResourceLimit(id string, transactionID string) (er
|
||||
return
|
||||
}
|
||||
|
||||
func (rs *RedisStorage) GetTiming(id string, skipCache bool, transactionID string) (t *utils.TPTiming, err error) {
|
||||
key := utils.TimingsPrefix + id
|
||||
if !skipCache {
|
||||
if x, ok := cache.Get(key); ok {
|
||||
if x == nil {
|
||||
return nil, utils.ErrNotFound
|
||||
}
|
||||
return x.(*utils.TPTiming), nil
|
||||
}
|
||||
}
|
||||
var values []byte
|
||||
if values, err = rs.Cmd("GET", key).Bytes(); err != nil {
|
||||
if err.Error() == "wrong type" { // did not find the destination
|
||||
cache.Set(key, nil, cacheCommit(transactionID), transactionID)
|
||||
err = utils.ErrNotFound
|
||||
}
|
||||
return
|
||||
}
|
||||
if err = rs.ms.Unmarshal(values, &t); err != nil {
|
||||
return
|
||||
}
|
||||
cache.Set(key, t, cacheCommit(transactionID), transactionID)
|
||||
return
|
||||
}
|
||||
|
||||
func (rs *RedisStorage) SetTiming(t *utils.TPTiming, transactionID string) error {
|
||||
fmt.Println("Redis")
|
||||
result, err := rs.ms.Marshal(t)
|
||||
if err != nil {
|
||||
return err
|
||||
}
|
||||
return rs.Cmd("SET", utils.TimingsPrefix+t.ID, result).Err
|
||||
}
|
||||
|
||||
func (rs *RedisStorage) RemoveTiming(id string, transactionID string) (err error) {
|
||||
key := utils.TimingsPrefix + id
|
||||
if err = rs.Cmd("DEL", key).Err; err != nil {
|
||||
return
|
||||
}
|
||||
cache.RemKey(key, cacheCommit(transactionID), transactionID)
|
||||
return
|
||||
}
|
||||
|
||||
func (rs *RedisStorage) GetReqFilterIndexes(dbKey string) (indexes map[string]map[string]utils.StringMap, err error) {
|
||||
mp, err := rs.Cmd("HGETALL", dbKey).Map()
|
||||
if err != nil {
|
||||
|
||||
@@ -1902,6 +1902,17 @@ func (tpr *TpReader) WriteToDatabase(flush, verbose, disable_reverse bool) (err
|
||||
log.Print("\t", rl.ID)
|
||||
}
|
||||
}
|
||||
if verbose {
|
||||
log.Print("Timings:")
|
||||
}
|
||||
for _, t := range tpr.timings {
|
||||
if err = tpr.dataStorage.SetTiming(t, utils.NonTransactional); err != nil {
|
||||
return err
|
||||
}
|
||||
if verbose {
|
||||
log.Print("\t", t.ID)
|
||||
}
|
||||
}
|
||||
if !disable_reverse {
|
||||
if len(tpr.destinations) > 0 {
|
||||
if verbose {
|
||||
|
||||
@@ -74,6 +74,7 @@ var (
|
||||
Cdrs: "1",
|
||||
SMCosts: "1",
|
||||
ResourceLimits: "1",
|
||||
Timings: "1",
|
||||
}
|
||||
)
|
||||
|
||||
@@ -99,6 +100,7 @@ type StructVersion struct {
|
||||
Cdrs string
|
||||
SMCosts string
|
||||
ResourceLimits string
|
||||
Timings string
|
||||
}
|
||||
|
||||
type MigrationInfo struct {
|
||||
|
||||
@@ -213,6 +213,7 @@ const (
|
||||
REVERSE_ALIASES_PREFIX = "rls_"
|
||||
ResourceLimitsPrefix = "rlm_"
|
||||
ResourceLimitsIndex = "rli_"
|
||||
TimingsPrefix = "tmg_"
|
||||
CDR_STATS_PREFIX = "cst_"
|
||||
TEMP_DESTINATION_PREFIX = "tmp_"
|
||||
LOG_CALL_COST_PREFIX = "cco_"
|
||||
|
||||
Reference in New Issue
Block a user