turbo charged cache at 2:30am

This commit is contained in:
Radu Ioan Fericean
2014-09-04 02:28:10 +03:00
parent 9467f5741b
commit 7b2ba2aeb9
4 changed files with 115 additions and 103 deletions

View File

@@ -3,9 +3,10 @@ package cache2go
import (
"errors"
"strings"
"sync"
"time"
"github.com/cgrates/cgrates/utils"
)
const (
@@ -21,6 +22,10 @@ type timestampedValue struct {
value interface{}
}
func (tsv timestampedValue) Value() interface{} {
return tsv.value
}
type transactionItem struct {
key string
value interface{}
@@ -28,9 +33,8 @@ type transactionItem struct {
}
var (
cache = make(map[string]timestampedValue)
mux sync.RWMutex
counters = make(map[string]int64)
cache = make(cacheStore)
mux sync.RWMutex
// transaction stuff
transactionBuffer []transactionItem
@@ -81,11 +85,7 @@ func Cache(key string, value interface{}) {
defer mux.Unlock()
}
if !transactionON {
if _, ok := cache[key]; !ok {
// only count if the key is not already there
count(key)
}
cache[key] = timestampedValue{time.Now(), value}
cache.Put(key, value)
//fmt.Println("ADD: ", key)
} else {
transactionBuffer = append(transactionBuffer, transactionItem{key: key, value: value, kind: KIND_ADD})
@@ -93,34 +93,15 @@ func Cache(key string, value interface{}) {
}
// Appends to an existing slice in the cache key
func CachePush(key string, val interface{}) {
func CachePush(key string, value interface{}) {
if !transactionLock {
mux.Lock()
defer mux.Unlock()
}
if !transactionON {
var elements []interface{}
if ti, exists := cache[key]; exists {
elements = ti.value.([]interface{})
}
// check if the val is already present
found := false
for _, v := range elements {
if val == v {
found = true
break
}
}
if !found {
elements = append(elements, val)
}
if _, ok := cache[key]; !ok {
// only count if the key is not already there
count(key)
}
cache[key] = timestampedValue{time.Now(), elements}
cache.Append(key, value)
} else {
transactionBuffer = append(transactionBuffer, transactionItem{key: key, value: val, kind: KIND_ADP})
transactionBuffer = append(transactionBuffer, transactionItem{key: key, value: value, kind: KIND_ADP})
}
}
@@ -128,19 +109,13 @@ func CachePush(key string, val interface{}) {
func GetCached(key string) (v interface{}, err error) {
mux.RLock()
defer mux.RUnlock()
if r, ok := cache[key]; ok {
return r.value, nil
}
return nil, errors.New("not found")
return cache.Get(key)
}
func GetKeyAge(key string) (time.Duration, error) {
mux.RLock()
defer mux.RUnlock()
if r, ok := cache[key]; ok {
return time.Since(r.timestamp), nil
}
return 0, errors.New("not found")
return cache.GetAge(key)
}
func RemKey(key string) {
@@ -149,11 +124,7 @@ func RemKey(key string) {
defer mux.Unlock()
}
if !transactionON {
if _, ok := cache[key]; ok {
//fmt.Println("REM: ", key)
delete(cache, key)
descount(key)
}
cache.Delete(key)
} else {
transactionBuffer = append(transactionBuffer, transactionItem{key: key, kind: KIND_REM})
}
@@ -165,15 +136,8 @@ func RemPrefixKey(prefix string) {
defer mux.Unlock()
}
if !transactionON {
for key, _ := range cache {
if strings.HasPrefix(key, prefix) {
//fmt.Println("PRF: ", key)
delete(cache, key)
descount(key)
}
}
cache.DeletePrefix(prefix)
} else {
transactionBuffer = append(transactionBuffer, transactionItem{key: prefix, kind: KIND_PRF})
}
}
@@ -182,61 +146,32 @@ func RemPrefixKey(prefix string) {
func Flush() {
mux.Lock()
defer mux.Unlock()
cache = make(map[string]timestampedValue)
counters = make(map[string]int64)
cache = make(cacheStore)
}
func CountEntries(prefix string) (result int64) {
mux.RLock()
defer mux.RUnlock()
if _, ok := counters[prefix]; ok {
return counters[prefix]
if _, ok := cache[prefix]; ok {
return int64(len(cache[prefix]))
}
return 0
}
// increments the counter for the specified key prefix
func count(key string) {
if len(key) < PREFIX_LEN {
return
}
prefix := key[:PREFIX_LEN]
if _, ok := counters[prefix]; ok {
// increase the value
counters[prefix] += 1
} else {
counters[prefix] = 1
}
}
// decrements the counter for the specified key prefix
func descount(key string) {
if len(key) < PREFIX_LEN {
return
}
prefix := key[:PREFIX_LEN]
if value, ok := counters[prefix]; ok && value > 0 {
counters[prefix] -= 1
}
}
func GetAllEntries(prefix string) map[string]interface{} {
func GetAllEntries(prefix string) (map[string]timestampedValue, error) {
mux.RLock()
defer mux.RUnlock()
result := make(map[string]interface{})
for key, timestampedValue := range cache {
if strings.HasPrefix(key, prefix) {
result[key] = timestampedValue.value
}
if keyMap, ok := cache[prefix]; ok {
return keyMap, nil
}
return result
return nil, errors.New(utils.ERR_NOT_FOUND)
}
func GetEntriesKeys(prefix string) (keys []string) {
mux.RLock()
defer mux.RUnlock()
for key, _ := range cache {
if strings.HasPrefix(key, prefix) {
if keyMap, ok := cache[prefix]; ok {
for key := range keyMap {
keys = append(keys, key)
}
}

View File

@@ -5,7 +5,7 @@ import "testing"
func TestRemKey(t *testing.T) {
Cache("t11_mm", "test")
if t1, err := GetCached("t11_mm"); err != nil || t1 != "test" {
t.Error("Error setting cache")
t.Error("Error setting cache: ", err, t1)
}
RemKey("t11_mm")
if t1, err := GetCached("t11_mm"); err == nil || t1 == "test" {
@@ -75,20 +75,20 @@ func TestTransactionRemBefore(t *testing.T) {
}
func TestRemPrefixKey(t *testing.T) {
Cache("x_t1", "test")
Cache("y_t1", "test")
RemPrefixKey("x_")
_, errX := GetCached("x_t1")
_, errY := GetCached("y_t1")
Cache("xxx_t1", "test")
Cache("yyy_t1", "test")
RemPrefixKey("xxx_")
_, errX := GetCached("xxx_t1")
_, errY := GetCached("yyy_t1")
if errX == nil || errY != nil {
t.Error("Error removing prefix: ", errX, errY)
}
}
func TestCachePush(t *testing.T) {
CachePush("x_t1", "1")
CachePush("x_t1", "2")
v, err := GetCached("x_t1")
CachePush("ccc_t1", "1")
CachePush("ccc_t1", "2")
v, err := GetCached("ccc_t1")
if err != nil || len(v.([]interface{})) != 2 {
t.Error("Error in cache push: ", v)
}

74
cache2go/store.go Normal file
View File

@@ -0,0 +1,74 @@
//Simple caching library with expiration capabilities
package cache2go
import (
"errors"
"time"
"github.com/cgrates/cgrates/utils"
)
type cacheStore map[string]map[string]timestampedValue
func (cs cacheStore) Put(key string, value interface{}) {
prefix, key := key[:PREFIX_LEN], key[PREFIX_LEN:]
if _, ok := cs[prefix]; !ok {
cs[prefix] = make(map[string]timestampedValue)
}
cs[prefix][key] = timestampedValue{time.Now(), value}
}
func (cs cacheStore) Append(key string, value interface{}) {
var elements []interface{}
v, err := cs.Get(key)
if err == nil {
elements = v.([]interface{})
}
// check if the val is already present
found := false
for _, v := range elements {
if value == v {
found = true
break
}
}
if !found {
elements = append(elements, value)
}
cache.Put(key, elements)
}
func (cs cacheStore) Get(key string) (interface{}, error) {
prefix, key := key[:PREFIX_LEN], key[PREFIX_LEN:]
if keyMap, ok := cs[prefix]; ok {
if ti, exists := keyMap[key]; exists {
return ti.value, nil
}
}
return nil, errors.New(utils.ERR_NOT_FOUND)
}
func (cs cacheStore) GetAge(key string) (time.Duration, error) {
prefix, key := key[:PREFIX_LEN], key[PREFIX_LEN:]
if keyMap, ok := cs[prefix]; ok {
if ti, exists := keyMap[key]; exists {
return time.Since(ti.timestamp), nil
}
}
return -1, errors.New(utils.ERR_NOT_FOUND)
}
func (cs cacheStore) Delete(key string) {
prefix, key := key[:PREFIX_LEN], key[PREFIX_LEN:]
if keyMap, ok := cs[prefix]; ok {
if _, exists := keyMap[key]; exists {
delete(keyMap, key)
}
}
}
func (cs cacheStore) DeletePrefix(prefix string) {
if _, ok := cs[prefix]; ok {
delete(cs, prefix)
}
}

View File

@@ -85,15 +85,18 @@ func CachedDestHasPrefix(destId, prefix string) bool {
}
func CleanStalePrefixes(destIds []string) {
prefixMap := cache2go.GetAllEntries(DESTINATION_PREFIX)
prefixMap, err := cache2go.GetAllEntries(DESTINATION_PREFIX)
if err != nil {
return
}
for prefix, idIDs := range prefixMap {
dIDs := idIDs.([]interface{})
dIDs := idIDs.Value().([]interface{})
changed := false
for _, searchedDID := range destIds {
if i, found := utils.GetSliceMemberIndex(utils.ConvertInterfaceSliceToStringSlice(dIDs), searchedDID); found {
if len(dIDs) == 1 {
// remove de prefix from cache
cache2go.RemKey(prefix)
cache2go.RemKey(DESTINATION_PREFIX + prefix)
} else {
// delte the testination from list and put the new list in chache
dIDs[i], dIDs = dIDs[len(dIDs)-1], dIDs[:len(dIDs)-1]
@@ -102,7 +105,7 @@ func CleanStalePrefixes(destIds []string) {
}
}
if changed {
cache2go.Cache(prefix, dIDs)
cache2go.Cache(DESTINATION_PREFIX+prefix, dIDs)
}
}
}