From 7b2ba2aeb9f4b8b5eb599f177ca82a111a2af1bd Mon Sep 17 00:00:00 2001 From: Radu Ioan Fericean Date: Thu, 4 Sep 2014 02:28:10 +0300 Subject: [PATCH] turbo charged cache at 2:30am --- cache2go/cache.go | 115 +++++++++-------------------------------- cache2go/cache_test.go | 18 +++---- cache2go/store.go | 74 ++++++++++++++++++++++++++ engine/destinations.go | 11 ++-- 4 files changed, 115 insertions(+), 103 deletions(-) create mode 100644 cache2go/store.go diff --git a/cache2go/cache.go b/cache2go/cache.go index d24b36f2c..46ceac596 100644 --- a/cache2go/cache.go +++ b/cache2go/cache.go @@ -3,9 +3,10 @@ package cache2go import ( "errors" - "strings" "sync" "time" + + "github.com/cgrates/cgrates/utils" ) const ( @@ -21,6 +22,10 @@ type timestampedValue struct { value interface{} } +func (tsv timestampedValue) Value() interface{} { + return tsv.value +} + type transactionItem struct { key string value interface{} @@ -28,9 +33,8 @@ type transactionItem struct { } var ( - cache = make(map[string]timestampedValue) - mux sync.RWMutex - counters = make(map[string]int64) + cache = make(cacheStore) + mux sync.RWMutex // transaction stuff transactionBuffer []transactionItem @@ -81,11 +85,7 @@ func Cache(key string, value interface{}) { defer mux.Unlock() } if !transactionON { - if _, ok := cache[key]; !ok { - // only count if the key is not already there - count(key) - } - cache[key] = timestampedValue{time.Now(), value} + cache.Put(key, value) //fmt.Println("ADD: ", key) } else { transactionBuffer = append(transactionBuffer, transactionItem{key: key, value: value, kind: KIND_ADD}) @@ -93,34 +93,15 @@ func Cache(key string, value interface{}) { } // Appends to an existing slice in the cache key -func CachePush(key string, val interface{}) { +func CachePush(key string, value interface{}) { if !transactionLock { mux.Lock() defer mux.Unlock() } if !transactionON { - var elements []interface{} - if ti, exists := cache[key]; exists { - elements = ti.value.([]interface{}) - } - // check if the val is already present - found := false - for _, v := range elements { - if val == v { - found = true - break - } - } - if !found { - elements = append(elements, val) - } - if _, ok := cache[key]; !ok { - // only count if the key is not already there - count(key) - } - cache[key] = timestampedValue{time.Now(), elements} + cache.Append(key, value) } else { - transactionBuffer = append(transactionBuffer, transactionItem{key: key, value: val, kind: KIND_ADP}) + transactionBuffer = append(transactionBuffer, transactionItem{key: key, value: value, kind: KIND_ADP}) } } @@ -128,19 +109,13 @@ func CachePush(key string, val interface{}) { func GetCached(key string) (v interface{}, err error) { mux.RLock() defer mux.RUnlock() - if r, ok := cache[key]; ok { - return r.value, nil - } - return nil, errors.New("not found") + return cache.Get(key) } func GetKeyAge(key string) (time.Duration, error) { mux.RLock() defer mux.RUnlock() - if r, ok := cache[key]; ok { - return time.Since(r.timestamp), nil - } - return 0, errors.New("not found") + return cache.GetAge(key) } func RemKey(key string) { @@ -149,11 +124,7 @@ func RemKey(key string) { defer mux.Unlock() } if !transactionON { - if _, ok := cache[key]; ok { - //fmt.Println("REM: ", key) - delete(cache, key) - descount(key) - } + cache.Delete(key) } else { transactionBuffer = append(transactionBuffer, transactionItem{key: key, kind: KIND_REM}) } @@ -165,15 +136,8 @@ func RemPrefixKey(prefix string) { defer mux.Unlock() } if !transactionON { - for key, _ := range cache { - if strings.HasPrefix(key, prefix) { - //fmt.Println("PRF: ", key) - delete(cache, key) - descount(key) - } - } + cache.DeletePrefix(prefix) } else { - transactionBuffer = append(transactionBuffer, transactionItem{key: prefix, kind: KIND_PRF}) } } @@ -182,61 +146,32 @@ func RemPrefixKey(prefix string) { func Flush() { mux.Lock() defer mux.Unlock() - cache = make(map[string]timestampedValue) - counters = make(map[string]int64) + cache = make(cacheStore) } func CountEntries(prefix string) (result int64) { mux.RLock() defer mux.RUnlock() - if _, ok := counters[prefix]; ok { - return counters[prefix] + if _, ok := cache[prefix]; ok { + return int64(len(cache[prefix])) } return 0 } -// increments the counter for the specified key prefix -func count(key string) { - if len(key) < PREFIX_LEN { - return - } - prefix := key[:PREFIX_LEN] - if _, ok := counters[prefix]; ok { - // increase the value - counters[prefix] += 1 - } else { - counters[prefix] = 1 - } -} - -// decrements the counter for the specified key prefix -func descount(key string) { - if len(key) < PREFIX_LEN { - return - } - prefix := key[:PREFIX_LEN] - if value, ok := counters[prefix]; ok && value > 0 { - counters[prefix] -= 1 - } -} - -func GetAllEntries(prefix string) map[string]interface{} { +func GetAllEntries(prefix string) (map[string]timestampedValue, error) { mux.RLock() defer mux.RUnlock() - result := make(map[string]interface{}) - for key, timestampedValue := range cache { - if strings.HasPrefix(key, prefix) { - result[key] = timestampedValue.value - } + if keyMap, ok := cache[prefix]; ok { + return keyMap, nil } - return result + return nil, errors.New(utils.ERR_NOT_FOUND) } func GetEntriesKeys(prefix string) (keys []string) { mux.RLock() defer mux.RUnlock() - for key, _ := range cache { - if strings.HasPrefix(key, prefix) { + if keyMap, ok := cache[prefix]; ok { + for key := range keyMap { keys = append(keys, key) } } diff --git a/cache2go/cache_test.go b/cache2go/cache_test.go index 5e27e11c4..7b915ae5a 100644 --- a/cache2go/cache_test.go +++ b/cache2go/cache_test.go @@ -5,7 +5,7 @@ import "testing" func TestRemKey(t *testing.T) { Cache("t11_mm", "test") if t1, err := GetCached("t11_mm"); err != nil || t1 != "test" { - t.Error("Error setting cache") + t.Error("Error setting cache: ", err, t1) } RemKey("t11_mm") if t1, err := GetCached("t11_mm"); err == nil || t1 == "test" { @@ -75,20 +75,20 @@ func TestTransactionRemBefore(t *testing.T) { } func TestRemPrefixKey(t *testing.T) { - Cache("x_t1", "test") - Cache("y_t1", "test") - RemPrefixKey("x_") - _, errX := GetCached("x_t1") - _, errY := GetCached("y_t1") + Cache("xxx_t1", "test") + Cache("yyy_t1", "test") + RemPrefixKey("xxx_") + _, errX := GetCached("xxx_t1") + _, errY := GetCached("yyy_t1") if errX == nil || errY != nil { t.Error("Error removing prefix: ", errX, errY) } } func TestCachePush(t *testing.T) { - CachePush("x_t1", "1") - CachePush("x_t1", "2") - v, err := GetCached("x_t1") + CachePush("ccc_t1", "1") + CachePush("ccc_t1", "2") + v, err := GetCached("ccc_t1") if err != nil || len(v.([]interface{})) != 2 { t.Error("Error in cache push: ", v) } diff --git a/cache2go/store.go b/cache2go/store.go new file mode 100644 index 000000000..b2436e527 --- /dev/null +++ b/cache2go/store.go @@ -0,0 +1,74 @@ +//Simple caching library with expiration capabilities +package cache2go + +import ( + "errors" + "time" + + "github.com/cgrates/cgrates/utils" +) + +type cacheStore map[string]map[string]timestampedValue + +func (cs cacheStore) Put(key string, value interface{}) { + prefix, key := key[:PREFIX_LEN], key[PREFIX_LEN:] + if _, ok := cs[prefix]; !ok { + cs[prefix] = make(map[string]timestampedValue) + } + cs[prefix][key] = timestampedValue{time.Now(), value} +} + +func (cs cacheStore) Append(key string, value interface{}) { + var elements []interface{} + v, err := cs.Get(key) + if err == nil { + elements = v.([]interface{}) + } + // check if the val is already present + found := false + for _, v := range elements { + if value == v { + found = true + break + } + } + if !found { + elements = append(elements, value) + } + cache.Put(key, elements) +} + +func (cs cacheStore) Get(key string) (interface{}, error) { + prefix, key := key[:PREFIX_LEN], key[PREFIX_LEN:] + if keyMap, ok := cs[prefix]; ok { + if ti, exists := keyMap[key]; exists { + return ti.value, nil + } + } + return nil, errors.New(utils.ERR_NOT_FOUND) +} + +func (cs cacheStore) GetAge(key string) (time.Duration, error) { + prefix, key := key[:PREFIX_LEN], key[PREFIX_LEN:] + if keyMap, ok := cs[prefix]; ok { + if ti, exists := keyMap[key]; exists { + return time.Since(ti.timestamp), nil + } + } + return -1, errors.New(utils.ERR_NOT_FOUND) +} + +func (cs cacheStore) Delete(key string) { + prefix, key := key[:PREFIX_LEN], key[PREFIX_LEN:] + if keyMap, ok := cs[prefix]; ok { + if _, exists := keyMap[key]; exists { + delete(keyMap, key) + } + } +} + +func (cs cacheStore) DeletePrefix(prefix string) { + if _, ok := cs[prefix]; ok { + delete(cs, prefix) + } +} diff --git a/engine/destinations.go b/engine/destinations.go index 7604b12f7..51973da47 100644 --- a/engine/destinations.go +++ b/engine/destinations.go @@ -85,15 +85,18 @@ func CachedDestHasPrefix(destId, prefix string) bool { } func CleanStalePrefixes(destIds []string) { - prefixMap := cache2go.GetAllEntries(DESTINATION_PREFIX) + prefixMap, err := cache2go.GetAllEntries(DESTINATION_PREFIX) + if err != nil { + return + } for prefix, idIDs := range prefixMap { - dIDs := idIDs.([]interface{}) + dIDs := idIDs.Value().([]interface{}) changed := false for _, searchedDID := range destIds { if i, found := utils.GetSliceMemberIndex(utils.ConvertInterfaceSliceToStringSlice(dIDs), searchedDID); found { if len(dIDs) == 1 { // remove de prefix from cache - cache2go.RemKey(prefix) + cache2go.RemKey(DESTINATION_PREFIX + prefix) } else { // delte the testination from list and put the new list in chache dIDs[i], dIDs = dIDs[len(dIDs)-1], dIDs[:len(dIDs)-1] @@ -102,7 +105,7 @@ func CleanStalePrefixes(destIds []string) { } } if changed { - cache2go.Cache(prefix, dIDs) + cache2go.Cache(DESTINATION_PREFIX+prefix, dIDs) } } }