imroved cache dump handling

This commit is contained in:
Radu Ioan Fericean
2016-07-06 22:29:32 +03:00
parent 257ecc51c1
commit 706c397cc6
7 changed files with 185 additions and 215 deletions

View File

@@ -21,7 +21,6 @@ package v1
import (
"errors"
"fmt"
"log"
"os"
"path"
"strconv"
@@ -1024,7 +1023,6 @@ func (arrp *AttrRemoveRatingProfile) GetId() (result string) {
}
func (self *ApierV1) RemoveRatingProfile(attr AttrRemoveRatingProfile, reply *string) error {
log.Printf("ATTR: %+v", attr)
if attr.Direction == "" {
attr.Direction = utils.OUT
}
@@ -1034,7 +1032,6 @@ func (self *ApierV1) RemoveRatingProfile(attr AttrRemoveRatingProfile, reply *st
return utils.ErrMandatoryIeMissing
}
_, err := engine.Guardian.Guard(func() (interface{}, error) {
log.Print("RPID: ", attr.GetId())
err := self.RatingDb.RemoveRatingProfile(attr.GetId())
if err != nil {
return 0, err

View File

@@ -1,11 +1,7 @@
//Simple caching library with expiration capabilities
package engine
import (
"sync"
"github.com/cgrates/cgrates/utils"
)
import "sync"
const (
PREFIX_LEN = 4
@@ -25,6 +21,7 @@ var (
transactionMux sync.Mutex
transactionON = false
transactionLock = false
dumper *cacheDumper
)
type transactionItem struct {
@@ -33,6 +30,13 @@ type transactionItem struct {
kind string
}
func CacheSetDumperPath(path string) (err error) {
if dumper == nil {
dumper, err = newCacheDumper(path)
}
return
}
func init() {
if DOUBLE_CACHE {
cache = newDoubleStore()
@@ -78,12 +82,6 @@ func CacheCommitTransaction() {
transactionMux.Unlock()
}
func CacheSave(path string, keys map[string][]string, cfi *utils.CacheFileInfo) error {
mux.Lock()
defer mux.Unlock()
return cache.Save(path, keys, cfi)
}
func CacheLoad(path string, keys []string) error {
if !transactionLock {
mux.Lock()

148
engine/cache_dumper.go Normal file
View File

@@ -0,0 +1,148 @@
package engine
import (
"bytes"
"compress/zlib"
"io/ioutil"
"os"
"path/filepath"
"github.com/cgrates/cgrates/utils"
"github.com/syndtr/goleveldb/leveldb"
)
type cacheDumper struct {
path string
dbMap map[string]*leveldb.DB
dataEncoder Marshaler
}
func newCacheDumper(path string) (*cacheDumper, error) {
if path != "" {
if err := os.MkdirAll(path, 0766); err != nil {
return nil, err
}
}
return &cacheDumper{
path: path,
dbMap: make(map[string]*leveldb.DB),
dataEncoder: NewCodecMsgpackMarshaler(),
}, nil
}
func (cd *cacheDumper) getDumbDb(prefix string) (*leveldb.DB, error) {
if cd == nil || cd.path == "" {
return nil, nil
}
db, found := cd.dbMap[prefix]
if !found {
var err error
db, err = leveldb.OpenFile(filepath.Join(cd.path, prefix+".cache"), nil)
if err != nil {
return nil, err
}
cd.dbMap[prefix] = db
}
return db, nil
}
func (cd *cacheDumper) put(prefix, key string, value interface{}) error {
db, err := cd.getDumbDb(prefix)
if err != nil || db == nil {
return err
}
encData, err := cd.dataEncoder.Marshal(value)
if err != nil {
return err
}
if len(encData) > 1000 {
var buf bytes.Buffer
w := zlib.NewWriter(&buf)
w.Write(encData)
w.Close()
encData = buf.Bytes()
}
return db.Put([]byte(key), encData, nil)
}
func (cd *cacheDumper) load(prefix string) (map[string]interface{}, error) {
db, err := cd.getDumbDb(prefix)
if err != nil || db == nil {
return nil, err
}
val := make(map[string]interface{})
iter := db.NewIterator(nil, nil)
for iter.Next() {
k := iter.Key()
data := iter.Value()
var encData []byte
if data[0] == 120 && data[1] == 156 { //zip header
x := bytes.NewBuffer(data)
r, err := zlib.NewReader(x)
if err != nil {
utils.Logger.Info("<cache decoder>: " + err.Error())
break
}
out, err := ioutil.ReadAll(r)
if err != nil {
utils.Logger.Info("<cache decoder>: " + err.Error())
break
}
r.Close()
encData = out
} else {
encData = data
}
v := cd.cacheTypeFactory(prefix)
if err := cd.dataEncoder.Unmarshal(encData, &v); err != nil {
return nil, err
}
val[string(k)] = v
}
iter.Release()
return val, nil
}
func (cd *cacheDumper) delete(prefix, key string) error {
db, err := cd.getDumbDb(prefix)
if err != nil || db == nil {
return err
}
return db.Delete([]byte(key), nil)
}
func (cd *cacheDumper) deleteAll(prefix string) error {
db, err := cd.getDumbDb(prefix)
if err != nil || db == nil {
return err
}
db.Close()
delete(cd.dbMap, prefix)
return os.RemoveAll(filepath.Join(cd.path, prefix+".cache"))
}
func (cd *cacheDumper) cacheTypeFactory(prefix string) interface{} {
switch prefix {
case utils.DESTINATION_PREFIX:
return make(map[string]struct{})
case utils.RATING_PLAN_PREFIX:
return &RatingPlan{}
case utils.RATING_PROFILE_PREFIX:
return &RatingProfile{}
case utils.LCR_PREFIX:
return &LCR{}
case utils.DERIVEDCHARGERS_PREFIX:
return &utils.DerivedChargers{}
case utils.ACTION_PREFIX:
return Actions{}
case utils.ACTION_PLAN_PREFIX:
return &ActionPlan{}
case utils.SHARED_GROUP_PREFIX:
return &SharedGroup{}
case utils.ALIASES_PREFIX:
return AliasValues{}
case utils.LOADINST_KEY[:PREFIX_LEN]:
return make([]*utils.LoadInstance, 0)
}
return nil
}

View File

@@ -1,29 +0,0 @@
package engine
import "github.com/cgrates/cgrates/utils"
func CacheTypeFactory(prefix string) interface{} {
switch prefix {
case utils.DESTINATION_PREFIX:
return make(map[string]struct{})
case utils.RATING_PLAN_PREFIX:
return &RatingPlan{}
case utils.RATING_PROFILE_PREFIX:
return &RatingProfile{}
case utils.LCR_PREFIX:
return &LCR{}
case utils.DERIVEDCHARGERS_PREFIX:
return &utils.DerivedChargers{}
case utils.ACTION_PREFIX:
return Actions{}
case utils.ACTION_PLAN_PREFIX:
return &ActionPlan{}
case utils.SHARED_GROUP_PREFIX:
return &SharedGroup{}
case utils.ALIASES_PREFIX:
return AliasValues{}
case utils.LOADINST_KEY[:PREFIX_LEN]:
return make([]*utils.LoadInstance, 0)
}
return nil
}

View File

@@ -2,11 +2,7 @@
package engine
import (
"bytes"
"compress/zlib"
"fmt"
"io/ioutil"
"log"
"os"
"path/filepath"
"strings"
@@ -14,7 +10,6 @@ import (
"time"
"github.com/cgrates/cgrates/utils"
"github.com/syndtr/goleveldb/leveldb"
)
type cacheStore interface {
@@ -27,7 +22,6 @@ type cacheStore interface {
CountEntriesForPrefix(string) int
GetAllForPrefix(string) (map[string]interface{}, error)
GetKeysForPrefix(string) []string
Save(string, map[string][]string, *utils.CacheFileInfo) error
Load(string, []string) error
}
@@ -46,6 +40,9 @@ func (cs cacheDoubleStore) Put(key string, value interface{}) {
cs[prefix] = mp
}
mp[key] = value
if err := dumper.put(prefix, key, value); err != nil {
utils.Logger.Info("<cache dumper> put error: " + err.Error())
}
}
func (cs cacheDoubleStore) Get(key string) (interface{}, error) {
@@ -87,11 +84,18 @@ func (cs cacheDoubleStore) Delete(key string) {
prefix, key := key[:PREFIX_LEN], key[PREFIX_LEN:]
if keyMap, ok := cs[prefix]; ok {
delete(keyMap, key)
if err := dumper.delete(prefix, key); err != nil {
utils.Logger.Info("<cache dumper> delete error: " + err.Error())
}
}
}
func (cs cacheDoubleStore) DeletePrefix(prefix string) {
delete(cs, prefix)
if err := dumper.deleteAll(prefix); err != nil {
utils.Logger.Info("<cache dumper> delete all error: " + err.Error())
}
}
func (cs cacheDoubleStore) CountEntriesForPrefix(prefix string) int {
@@ -120,71 +124,6 @@ func (cs cacheDoubleStore) GetKeysForPrefix(prefix string) (keys []string) {
return
}
func (cs cacheDoubleStore) Save(path string, prefixKeysMap map[string][]string, cfi *utils.CacheFileInfo) error {
start := time.Now()
//log.Printf("path: %s prefixes: %v", path, prefixes)
if path == "" || len(prefixKeysMap) == 0 {
return nil
}
//log.Print("saving cache prefixes: ", prefixes)
// create a the path
if err := os.MkdirAll(path, 0766); err != nil {
utils.Logger.Info("<cache encoder>:" + err.Error())
return err
}
var wg sync.WaitGroup
var prefixSlice []string
for prefix, keys := range prefixKeysMap {
prefix = prefix[:PREFIX_LEN]
mapValue, found := cs[prefix]
if !found {
continue
}
prefixSlice = append(prefixSlice, prefix)
wg.Add(1)
go func(pref string, ks []string, data map[string]interface{}) {
defer wg.Done()
dataEncoder := NewCodecMsgpackMarshaler()
db, err := leveldb.OpenFile(filepath.Join(path, pref+".cache"), nil)
if err != nil {
log.Fatal(err)
}
defer db.Close()
// destinations are reverse mapped
if pref == utils.DESTINATION_PREFIX {
ks = make([]string, len(cs[utils.DESTINATION_PREFIX]))
i := 0
for dk := range cs[utils.DESTINATION_PREFIX] {
ks[i] = dk
i++
}
}
for _, k := range ks {
v := data[k]
if encData, err := dataEncoder.Marshal(v); err == nil {
if len(encData) > 1000 {
var buf bytes.Buffer
w := zlib.NewWriter(&buf)
w.Write(encData)
w.Close()
encData = buf.Bytes()
}
db.Put([]byte(k), encData, nil)
} else {
utils.Logger.Info("<cache encoder>:" + err.Error())
break
}
}
}(prefix, keys, mapValue)
}
wg.Wait()
utils.Logger.Info(fmt.Sprintf("Cache %v save time: %v", prefixSlice, time.Since(start)))
return utils.SaveCacheFileInfo(path, cfi)
}
func (cs cacheDoubleStore) Load(path string, prefixes []string) error {
if path == "" || len(prefixes) == 0 {
return nil
@@ -199,53 +138,15 @@ func (cs cacheDoubleStore) Load(path string, prefixes []string) error {
continue
}
wg.Add(1)
go func(dirPath, key string) {
go func(dirPath, pref string) {
defer wg.Done()
db, err := leveldb.OpenFile(dirPath, nil)
val, err := dumper.load(pref)
if err != nil {
utils.Logger.Info("<cache decoder>: " + err.Error())
utils.Logger.Info("<cache dumper> load error: " + err.Error())
return
}
defer db.Close()
dataDecoder := NewCodecMsgpackMarshaler()
val := make(map[string]interface{})
iter := db.NewIterator(nil, nil)
for iter.Next() {
// Remember that the contents of the returned slice should not be modified, and
// only valid until the next call to Next.
k := iter.Key()
data := iter.Value()
var encData []byte
if data[0] == 120 && data[1] == 156 { //zip header
x := bytes.NewBuffer(data)
r, err := zlib.NewReader(x)
if err != nil {
//log.Printf("%s err3", key)
utils.Logger.Info("<cache decoder>: " + err.Error())
break
}
out, err := ioutil.ReadAll(r)
if err != nil {
//log.Printf("%s err4", key)
utils.Logger.Info("<cache decoder>: " + err.Error())
break
}
r.Close()
encData = out
} else {
encData = data
}
v := CacheTypeFactory(key)
if err := dataDecoder.Unmarshal(encData, &v); err != nil {
//log.Printf("%s err5", key)
utils.Logger.Info("<cache decoder>: " + err.Error())
break
}
val[string(k)] = v
}
iter.Release()
mux.Lock()
cs[key] = val
cs[pref] = val
mux.Unlock()
}(p, prefix)
}
@@ -379,11 +280,6 @@ func (cs cacheSimpleStore) GetKeysForPrefix(prefix string) (keys []string) {
return
}
func (cs cacheSimpleStore) Save(path string, keys map[string][]string, cfi *utils.CacheFileInfo) error {
utils.Logger.Info("simplestore save")
return nil
}
func (cs cacheSimpleStore) Load(path string, keys []string) error {
utils.Logger.Info("simplestore load")
return nil

View File

@@ -287,6 +287,11 @@ func NewMongoStorage(host, port, db, user, pass string, cdrsIndexes []string, ca
if err = ndb.C(utils.TBLSMCosts).EnsureIndex(index); err != nil {
return nil, err
}
if cacheDumpDir != "" {
if err := CacheSetDumperPath(cacheDumpDir); err != nil {
utils.Logger.Info("<cache dumper> init error: " + err.Error())
}
}
return &MongoStorage{db: db, session: session, ms: NewCodecMsgpackMarshaler(), cacheDumpDir: cacheDumpDir, loadHistorySize: loadHistorySize}, err
}
@@ -665,32 +670,7 @@ func (ms *MongoStorage) cacheRating(dKeys, rpKeys, rpfKeys, lcrKeys, dcsKeys, ac
utils.Logger.Info(fmt.Sprintf("error saving load history: %v (%v)", loadHist, err))
return err
}
keys := make(map[string][]string)
if len(dKeys) > 0 {
keys[utils.DESTINATION_PREFIX] = dKeys
}
if len(rpKeys) > 0 {
keys[utils.RATING_PLAN_PREFIX] = rpKeys
}
if len(rpfKeys) > 0 {
keys[utils.RATING_PROFILE_PREFIX] = rpfKeys
}
if len(lcrKeys) > 0 {
keys[utils.LCR_PREFIX] = lcrKeys
}
if len(actKeys) > 0 {
keys[utils.ACTION_PREFIX] = actKeys
}
if len(dcsKeys) > 0 {
keys[utils.DERIVEDCHARGERS_PREFIX] = dcsKeys
}
if len(aplKeys) > 0 {
keys[utils.ACTION_PLAN_PREFIX] = aplKeys
}
if len(shgKeys) > 0 {
keys[utils.SHARED_GROUP_PREFIX] = shgKeys
}
return CacheSave(ms.cacheDumpDir, keys, &utils.CacheFileInfo{Encoding: utils.GOB, LoadInfo: loadHist})
return utils.SaveCacheFileInfo(ms.cacheDumpDir, &utils.CacheFileInfo{Encoding: utils.MSGPACK, LoadInfo: loadHist})
}
func (ms *MongoStorage) CacheAccountingAll() error {
@@ -794,7 +774,7 @@ func (ms *MongoStorage) cacheAccounting(alsKeys []string) (err error) {
utils.Logger.Info(fmt.Sprintf("error saving load history: %v (%v)", loadHist, err))
return err
}
return CacheSave(ms.cacheDumpDir, keys, &utils.CacheFileInfo{Encoding: utils.GOB, LoadInfo: loadHist})
return utils.SaveCacheFileInfo(ms.cacheDumpDir, &utils.CacheFileInfo{Encoding: utils.MSGPACK, LoadInfo: loadHist})
}
func (ms *MongoStorage) HasData(category, subject string) (bool, error) {

View File

@@ -74,6 +74,11 @@ func NewRedisStorage(address string, db int, pass, mrshlerStr string, maxConns i
} else {
return nil, fmt.Errorf("Unsupported marshaler: %v", mrshlerStr)
}
if cacheDumpDir != "" {
if err := CacheSetDumperPath(cacheDumpDir); err != nil {
utils.Logger.Info("<cache dumper> init error: " + err.Error())
}
}
return &RedisStorage{db: p, ms: mrshler, cacheDumpDir: cacheDumpDir, loadHistorySize: loadHistorySize}, nil
}
@@ -340,32 +345,7 @@ func (rs *RedisStorage) cacheRating(dKeys, rpKeys, rpfKeys, lcrKeys, dcsKeys, ac
return err
}
keys := make(map[string][]string)
if len(dKeys) > 0 {
keys[utils.DESTINATION_PREFIX] = dKeys
}
if len(rpKeys) > 0 {
keys[utils.RATING_PLAN_PREFIX] = rpKeys
}
if len(rpfKeys) > 0 {
keys[utils.RATING_PROFILE_PREFIX] = rpfKeys
}
if len(lcrKeys) > 0 {
keys[utils.LCR_PREFIX] = lcrKeys
}
if len(actKeys) > 0 {
keys[utils.ACTION_PREFIX] = actKeys
}
if len(dcsKeys) > 0 {
keys[utils.DERIVEDCHARGERS_PREFIX] = dcsKeys
}
if len(aplKeys) > 0 {
keys[utils.ACTION_PLAN_PREFIX] = aplKeys
}
if len(shgKeys) > 0 {
keys[utils.SHARED_GROUP_PREFIX] = shgKeys
}
return CacheSave(rs.cacheDumpDir, keys, &utils.CacheFileInfo{Encoding: utils.GOB, LoadInfo: loadHist})
return utils.SaveCacheFileInfo(rs.cacheDumpDir, &utils.CacheFileInfo{Encoding: utils.MSGPACK, LoadInfo: loadHist})
}
func (rs *RedisStorage) CacheAccountingAll() error {
@@ -467,7 +447,7 @@ func (rs *RedisStorage) cacheAccounting(alsKeys []string) (err error) {
utils.Logger.Info(fmt.Sprintf("error saving load history: %v (%v)", loadHist, err))
return err
}
return CacheSave(rs.cacheDumpDir, keys, &utils.CacheFileInfo{Encoding: utils.GOB, LoadInfo: loadHist})
return utils.SaveCacheFileInfo(rs.cacheDumpDir, &utils.CacheFileInfo{Encoding: utils.MSGPACK, LoadInfo: loadHist})
}
// Used to check if specific subject is stored using prefix key attached to entity