mirror of
https://github.com/cgrates/cgrates.git
synced 2026-02-13 02:56:24 +05:00
save only the parts being cached
This commit is contained in:
@@ -78,7 +78,7 @@ func CacheCommitTransaction() {
|
||||
transactionMux.Unlock()
|
||||
}
|
||||
|
||||
func CacheSave(path string, keys []string, cfi *utils.CacheFileInfo) error {
|
||||
func CacheSave(path string, keys map[string][]string, cfi *utils.CacheFileInfo) error {
|
||||
mux.Lock()
|
||||
defer mux.Unlock()
|
||||
return cache.Save(path, keys, cfi)
|
||||
|
||||
@@ -2,193 +2,28 @@ package engine
|
||||
|
||||
import "github.com/cgrates/cgrates/utils"
|
||||
|
||||
type cacheKeyValue interface {
|
||||
Key() string
|
||||
Value() interface{}
|
||||
}
|
||||
|
||||
type mapKeyValue struct {
|
||||
K string
|
||||
V map[string]struct{}
|
||||
}
|
||||
|
||||
func (mkv *mapKeyValue) Key() string {
|
||||
return mkv.K
|
||||
}
|
||||
|
||||
func (mkv *mapKeyValue) Value() interface{} {
|
||||
return mkv.V
|
||||
}
|
||||
|
||||
type rpKeyValue struct {
|
||||
K string
|
||||
V *RatingPlan
|
||||
}
|
||||
|
||||
func (mkv *rpKeyValue) Key() string {
|
||||
return mkv.K
|
||||
}
|
||||
|
||||
func (mkv *rpKeyValue) Value() interface{} {
|
||||
return mkv.V
|
||||
}
|
||||
|
||||
type rpfKeyValue struct {
|
||||
K string
|
||||
V *RatingProfile
|
||||
}
|
||||
|
||||
func (mkv *rpfKeyValue) Key() string {
|
||||
return mkv.K
|
||||
}
|
||||
|
||||
func (mkv *rpfKeyValue) Value() interface{} {
|
||||
return mkv.V
|
||||
}
|
||||
|
||||
type lcrKeyValue struct {
|
||||
K string
|
||||
V *LCR
|
||||
}
|
||||
|
||||
func (mkv *lcrKeyValue) Key() string {
|
||||
return mkv.K
|
||||
}
|
||||
|
||||
func (mkv *lcrKeyValue) Value() interface{} {
|
||||
return mkv.V
|
||||
}
|
||||
|
||||
type dcKeyValue struct {
|
||||
K string
|
||||
V *utils.DerivedChargers
|
||||
}
|
||||
|
||||
func (mkv *dcKeyValue) Key() string {
|
||||
return mkv.K
|
||||
}
|
||||
|
||||
func (mkv *dcKeyValue) Value() interface{} {
|
||||
return mkv.V
|
||||
}
|
||||
|
||||
type acsKeyValue struct {
|
||||
K string
|
||||
V Actions
|
||||
}
|
||||
|
||||
func (mkv *acsKeyValue) Key() string {
|
||||
return mkv.K
|
||||
}
|
||||
|
||||
func (mkv *acsKeyValue) Value() interface{} {
|
||||
return mkv.V
|
||||
}
|
||||
|
||||
type aplKeyValue struct {
|
||||
K string
|
||||
V *ActionPlan
|
||||
}
|
||||
|
||||
func (mkv *aplKeyValue) Key() string {
|
||||
return mkv.K
|
||||
}
|
||||
|
||||
func (mkv *aplKeyValue) Value() interface{} {
|
||||
return mkv.V
|
||||
}
|
||||
|
||||
type sgKeyValue struct {
|
||||
K string
|
||||
V *SharedGroup
|
||||
}
|
||||
|
||||
func (mkv *sgKeyValue) Key() string {
|
||||
return mkv.K
|
||||
}
|
||||
|
||||
func (mkv *sgKeyValue) Value() interface{} {
|
||||
return mkv.V
|
||||
}
|
||||
|
||||
type alsKeyValue struct {
|
||||
K string
|
||||
V AliasValues
|
||||
}
|
||||
|
||||
func (mkv *alsKeyValue) Key() string {
|
||||
return mkv.K
|
||||
}
|
||||
|
||||
func (mkv *alsKeyValue) Value() interface{} {
|
||||
return mkv.V
|
||||
}
|
||||
|
||||
type loadKeyValue struct {
|
||||
K string
|
||||
V []*utils.LoadInstance
|
||||
}
|
||||
|
||||
func (mkv *loadKeyValue) Key() string {
|
||||
return mkv.K
|
||||
}
|
||||
|
||||
func (mkv *loadKeyValue) Value() interface{} {
|
||||
return mkv.V
|
||||
}
|
||||
|
||||
func CacheTypeFactory(prefix string, key string, value interface{}) cacheKeyValue {
|
||||
func CacheTypeFactory(prefix string) interface{} {
|
||||
switch prefix {
|
||||
case utils.DESTINATION_PREFIX:
|
||||
if value != nil {
|
||||
return &mapKeyValue{key, value.(map[string]struct{})}
|
||||
}
|
||||
return &mapKeyValue{"", make(map[string]struct{})}
|
||||
return make(map[string]struct{})
|
||||
case utils.RATING_PLAN_PREFIX:
|
||||
if value != nil {
|
||||
return &rpKeyValue{key, value.(*RatingPlan)}
|
||||
}
|
||||
return &rpfKeyValue{"", &RatingProfile{}}
|
||||
return &RatingPlan{}
|
||||
case utils.RATING_PROFILE_PREFIX:
|
||||
if value != nil {
|
||||
return &rpfKeyValue{key, value.(*RatingProfile)}
|
||||
}
|
||||
return &rpfKeyValue{"", &RatingProfile{}}
|
||||
return &RatingProfile{}
|
||||
case utils.LCR_PREFIX:
|
||||
if value != nil {
|
||||
return &lcrKeyValue{key, value.(*LCR)}
|
||||
}
|
||||
return &lcrKeyValue{"", &LCR{}}
|
||||
return &LCR{}
|
||||
case utils.DERIVEDCHARGERS_PREFIX:
|
||||
if value != nil {
|
||||
return &dcKeyValue{key, value.(*utils.DerivedChargers)}
|
||||
}
|
||||
return &dcKeyValue{"", &utils.DerivedChargers{}}
|
||||
return &utils.DerivedChargers{}
|
||||
case utils.ACTION_PREFIX:
|
||||
if value != nil {
|
||||
return &acsKeyValue{key, value.(Actions)}
|
||||
}
|
||||
return &acsKeyValue{"", Actions{}}
|
||||
return Actions{}
|
||||
case utils.ACTION_PLAN_PREFIX:
|
||||
if value != nil {
|
||||
return &aplKeyValue{key, value.(*ActionPlan)}
|
||||
}
|
||||
return &aplKeyValue{"", &ActionPlan{}}
|
||||
return &ActionPlan{}
|
||||
case utils.SHARED_GROUP_PREFIX:
|
||||
if value != nil {
|
||||
return &sgKeyValue{key, value.(*SharedGroup)}
|
||||
}
|
||||
return &sgKeyValue{"", &SharedGroup{}}
|
||||
return &SharedGroup{}
|
||||
case utils.ALIASES_PREFIX:
|
||||
if value != nil {
|
||||
return &alsKeyValue{key, value.(AliasValues)}
|
||||
}
|
||||
return &alsKeyValue{"", AliasValues{}}
|
||||
return AliasValues{}
|
||||
case utils.LOADINST_KEY[:PREFIX_LEN]:
|
||||
if value != nil {
|
||||
return &loadKeyValue{key, value.([]*utils.LoadInstance)}
|
||||
}
|
||||
return &loadKeyValue{"", make([]*utils.LoadInstance, 0)}
|
||||
return make([]*utils.LoadInstance, 0)
|
||||
}
|
||||
return nil
|
||||
}
|
||||
|
||||
@@ -27,7 +27,7 @@ type cacheStore interface {
|
||||
CountEntriesForPrefix(string) int
|
||||
GetAllForPrefix(string) (map[string]interface{}, error)
|
||||
GetKeysForPrefix(string) []string
|
||||
Save(string, []string, *utils.CacheFileInfo) error
|
||||
Save(string, map[string][]string, *utils.CacheFileInfo) error
|
||||
Load(string, []string) error
|
||||
}
|
||||
|
||||
@@ -120,9 +120,10 @@ func (cs cacheDoubleStore) GetKeysForPrefix(prefix string) (keys []string) {
|
||||
return
|
||||
}
|
||||
|
||||
func (cs cacheDoubleStore) Save(path string, prefixes []string, cfi *utils.CacheFileInfo) error {
|
||||
func (cs cacheDoubleStore) Save(path string, prefixKeysMap map[string][]string, cfi *utils.CacheFileInfo) error {
|
||||
start := time.Now()
|
||||
//log.Printf("path: %s prefixes: %v", path, prefixes)
|
||||
if path == "" || len(prefixes) == 0 {
|
||||
if path == "" || len(prefixKeysMap) == 0 {
|
||||
return nil
|
||||
}
|
||||
//log.Print("saving cache prefixes: ", prefixes)
|
||||
@@ -133,24 +134,35 @@ func (cs cacheDoubleStore) Save(path string, prefixes []string, cfi *utils.Cache
|
||||
}
|
||||
|
||||
var wg sync.WaitGroup
|
||||
for _, prefix := range prefixes {
|
||||
var prefixSlice []string
|
||||
for prefix, keys := range prefixKeysMap {
|
||||
prefix = prefix[:PREFIX_LEN]
|
||||
mapValue, found := cs[prefix]
|
||||
if !found {
|
||||
continue
|
||||
}
|
||||
prefixSlice = append(prefixSlice, prefix)
|
||||
wg.Add(1)
|
||||
go func(key string, data map[string]interface{}) {
|
||||
go func(pref string, ks []string, data map[string]interface{}) {
|
||||
defer wg.Done()
|
||||
|
||||
dataEncoder := NewCodecMsgpackMarshaler()
|
||||
db, err := leveldb.OpenFile(filepath.Join(path, key+".cache"), nil)
|
||||
db, err := leveldb.OpenFile(filepath.Join(path, pref+".cache"), nil)
|
||||
if err != nil {
|
||||
log.Fatal(err)
|
||||
}
|
||||
defer db.Close()
|
||||
|
||||
for k, v := range data {
|
||||
// destinations are reverse mapped
|
||||
if pref == utils.DESTINATION_PREFIX {
|
||||
ks = make([]string, len(cs[utils.DESTINATION_PREFIX]))
|
||||
i := 0
|
||||
for dk := range cs[utils.DESTINATION_PREFIX] {
|
||||
ks[i] = dk
|
||||
i++
|
||||
}
|
||||
}
|
||||
for _, k := range ks {
|
||||
v := data[k]
|
||||
if encData, err := dataEncoder.Marshal(v); err == nil {
|
||||
if len(encData) > 1000 {
|
||||
var buf bytes.Buffer
|
||||
@@ -165,9 +177,11 @@ func (cs cacheDoubleStore) Save(path string, prefixes []string, cfi *utils.Cache
|
||||
break
|
||||
}
|
||||
}
|
||||
}(prefix, mapValue)
|
||||
}(prefix, keys, mapValue)
|
||||
}
|
||||
wg.Wait()
|
||||
|
||||
utils.Logger.Info(fmt.Sprintf("Cache %v save time: %v", prefixSlice, time.Since(start)))
|
||||
return utils.SaveCacheFileInfo(path, cfi)
|
||||
}
|
||||
|
||||
@@ -221,8 +235,7 @@ func (cs cacheDoubleStore) Load(path string, prefixes []string) error {
|
||||
} else {
|
||||
encData = data
|
||||
}
|
||||
kv := CacheTypeFactory(key, "", nil)
|
||||
v := kv.Value()
|
||||
v := CacheTypeFactory(key)
|
||||
if err := dataDecoder.Unmarshal(encData, &v); err != nil {
|
||||
//log.Printf("%s err5", key)
|
||||
utils.Logger.Info("<cache decoder>: " + err.Error())
|
||||
@@ -366,7 +379,7 @@ func (cs cacheSimpleStore) GetKeysForPrefix(prefix string) (keys []string) {
|
||||
return
|
||||
}
|
||||
|
||||
func (cs cacheSimpleStore) Save(path string, keys []string, cfi *utils.CacheFileInfo) error {
|
||||
func (cs cacheSimpleStore) Save(path string, keys map[string][]string, cfi *utils.CacheFileInfo) error {
|
||||
utils.Logger.Info("simplestore save")
|
||||
return nil
|
||||
}
|
||||
|
||||
@@ -665,30 +665,30 @@ func (ms *MongoStorage) cacheRating(dKeys, rpKeys, rpfKeys, lcrKeys, dcsKeys, ac
|
||||
utils.Logger.Info(fmt.Sprintf("error saving load history: %v (%v)", loadHist, err))
|
||||
return err
|
||||
}
|
||||
var keys []string
|
||||
keys := make(map[string][]string)
|
||||
if len(dKeys) > 0 {
|
||||
keys = append(keys, utils.DESTINATION_PREFIX)
|
||||
keys[utils.DESTINATION_PREFIX] = dKeys
|
||||
}
|
||||
if len(rpKeys) > 0 {
|
||||
keys = append(keys, utils.RATING_PLAN_PREFIX)
|
||||
keys[utils.RATING_PLAN_PREFIX] = rpKeys
|
||||
}
|
||||
if len(rpfKeys) > 0 {
|
||||
keys = append(keys, utils.RATING_PROFILE_PREFIX)
|
||||
keys[utils.RATING_PROFILE_PREFIX] = rpfKeys
|
||||
}
|
||||
if len(lcrKeys) > 0 {
|
||||
keys = append(keys, utils.LCR_PREFIX)
|
||||
keys[utils.LCR_PREFIX] = lcrKeys
|
||||
}
|
||||
if len(actKeys) > 0 {
|
||||
keys = append(keys, utils.ACTION_PREFIX)
|
||||
keys[utils.ACTION_PREFIX] = actKeys
|
||||
}
|
||||
if len(actKeys) > 0 {
|
||||
keys = append(keys, utils.DERIVEDCHARGERS_PREFIX)
|
||||
if len(dcsKeys) > 0 {
|
||||
keys[utils.DERIVEDCHARGERS_PREFIX] = dcsKeys
|
||||
}
|
||||
if len(aplKeys) > 0 {
|
||||
keys = append(keys, utils.ACTION_PLAN_PREFIX)
|
||||
keys[utils.ACTION_PLAN_PREFIX] = aplKeys
|
||||
}
|
||||
if len(shgKeys) > 0 {
|
||||
keys = append(keys, utils.SHARED_GROUP_PREFIX)
|
||||
keys[utils.SHARED_GROUP_PREFIX] = shgKeys
|
||||
}
|
||||
return CacheSave(ms.cacheDumpDir, keys, &utils.CacheFileInfo{Encoding: utils.GOB, LoadInfo: loadHist})
|
||||
}
|
||||
@@ -773,9 +773,9 @@ func (ms *MongoStorage) cacheAccounting(alsKeys []string) (err error) {
|
||||
utils.Logger.Info("Finished load history caching.")
|
||||
CacheCommitTransaction()
|
||||
utils.Logger.Info(fmt.Sprintf("Cache accounting creation time: %v", time.Since(start)))
|
||||
var keys []string
|
||||
keys := make(map[string][]string)
|
||||
if len(alsKeys) > 0 {
|
||||
keys = append(keys, utils.ALIASES_PREFIX)
|
||||
keys[utils.ALIASES_PREFIX] = alsKeys
|
||||
}
|
||||
|
||||
var loadHist *utils.LoadInstance
|
||||
|
||||
@@ -340,30 +340,30 @@ func (rs *RedisStorage) cacheRating(dKeys, rpKeys, rpfKeys, lcrKeys, dcsKeys, ac
|
||||
return err
|
||||
}
|
||||
|
||||
var keys []string
|
||||
keys := make(map[string][]string)
|
||||
if len(dKeys) > 0 {
|
||||
keys = append(keys, utils.DESTINATION_PREFIX)
|
||||
keys[utils.DESTINATION_PREFIX] = dKeys
|
||||
}
|
||||
if len(rpKeys) > 0 {
|
||||
keys = append(keys, utils.RATING_PLAN_PREFIX)
|
||||
keys[utils.RATING_PLAN_PREFIX] = rpKeys
|
||||
}
|
||||
if len(rpfKeys) > 0 {
|
||||
keys = append(keys, utils.RATING_PROFILE_PREFIX)
|
||||
keys[utils.RATING_PROFILE_PREFIX] = rpfKeys
|
||||
}
|
||||
if len(lcrKeys) > 0 {
|
||||
keys = append(keys, utils.LCR_PREFIX)
|
||||
keys[utils.LCR_PREFIX] = lcrKeys
|
||||
}
|
||||
if len(actKeys) > 0 {
|
||||
keys = append(keys, utils.ACTION_PREFIX)
|
||||
keys[utils.ACTION_PREFIX] = actKeys
|
||||
}
|
||||
if len(actKeys) > 0 {
|
||||
keys = append(keys, utils.DERIVEDCHARGERS_PREFIX)
|
||||
if len(dcsKeys) > 0 {
|
||||
keys[utils.DERIVEDCHARGERS_PREFIX] = dcsKeys
|
||||
}
|
||||
if len(aplKeys) > 0 {
|
||||
keys = append(keys, utils.ACTION_PLAN_PREFIX)
|
||||
keys[utils.ACTION_PLAN_PREFIX] = aplKeys
|
||||
}
|
||||
if len(shgKeys) > 0 {
|
||||
keys = append(keys, utils.SHARED_GROUP_PREFIX)
|
||||
keys[utils.SHARED_GROUP_PREFIX] = shgKeys
|
||||
}
|
||||
return CacheSave(rs.cacheDumpDir, keys, &utils.CacheFileInfo{Encoding: utils.GOB, LoadInfo: loadHist})
|
||||
}
|
||||
@@ -443,9 +443,9 @@ func (rs *RedisStorage) cacheAccounting(alsKeys []string) (err error) {
|
||||
utils.Logger.Info("Finished load history caching.")
|
||||
CacheCommitTransaction()
|
||||
utils.Logger.Info(fmt.Sprintf("Cache accounting creation time: %v", time.Since(start)))
|
||||
var keys []string
|
||||
keys := make(map[string][]string)
|
||||
if len(alsKeys) > 0 {
|
||||
keys = append(keys, utils.ALIASES_PREFIX)
|
||||
keys[utils.ALIASES_PREFIX] = alsKeys
|
||||
}
|
||||
loadHistList, err := rs.GetLoadHistory(1, true)
|
||||
if err != nil || len(loadHistList) == 0 {
|
||||
|
||||
Reference in New Issue
Block a user