From 5c9d506d5bb342fe1a8b57f15ea9176bd78d7dd7 Mon Sep 17 00:00:00 2001 From: Radu Ioan Fericean Date: Tue, 19 Jul 2016 19:18:59 +0300 Subject: [PATCH] new composite cache lru/ttl --- cache2go/cache.go | 57 ++++++++++++++------------------ cache2go/cache_test.go | 42 ++++++++++++++++++------ cache2go/entry.go | 59 ---------------------------------- engine/cache.go | 10 +++--- engine/cache_store.go | 30 ++++++++--------- engine/storage_interface.go | 5 +-- engine/storage_mongo_datadb.go | 25 ++++++++++++-- engine/storage_redis.go | 42 ++++++++++++++++++++++++ 8 files changed, 146 insertions(+), 124 deletions(-) delete mode 100644 cache2go/entry.go diff --git a/cache2go/cache.go b/cache2go/cache.go index 7d897c5c1..81adf2804 100644 --- a/cache2go/cache.go +++ b/cache2go/cache.go @@ -11,7 +11,7 @@ type Cache struct { mu sync.RWMutex // MaxEntries is the maximum number of cache entries before // an item is evicted. Zero means no limit. - MaxEntries int + maxEntries int // OnEvicted optionally specificies a callback function to be // executed when an entry is purged from the cache. @@ -20,29 +20,27 @@ type Cache struct { ll *list.List cache map[interface{}]*list.Element expiration time.Duration - isTTL bool +} + +type entry struct { + key string + value interface{} + timestamp time.Time } // New creates a new Cache. // If maxEntries is zero, the cache has no limit and it's assumed // that eviction is done by the caller. -func NewLRU(maxEntries int) *Cache { +func New(maxEntries int, expire time.Duration) *Cache { c := &Cache{ - MaxEntries: maxEntries, - ll: list.New(), - cache: make(map[interface{}]*list.Element), - } - return c -} - -func NewTTL(expire time.Duration) *Cache { - c := &Cache{ - ll: list.New(), - cache: make(map[interface{}]*list.Element), + maxEntries: maxEntries, expiration: expire, - isTTL: true, + ll: list.New(), + cache: make(map[interface{}]*list.Element), + } + if c.expiration > 0 { + go c.cleanExpired() } - go c.cleanExpired() return c } @@ -56,7 +54,7 @@ func (c *Cache) cleanExpired() { time.Sleep(c.expiration) continue } - en := e.Value.(*entryTTL) + en := e.Value.(*entry) if en.timestamp.Add(c.expiration).After(time.Now()) { c.mu.Lock() c.removeElement(e) @@ -78,23 +76,18 @@ func (c *Cache) Set(key string, value interface{}) { if e, ok := c.cache[key]; ok { c.ll.MoveToFront(e) - en := e.Value.(entry) - en.SetValue(value) - en.SetTimestamp(time.Now()) + en := e.Value.(*entry) + en.value = value + en.timestamp = time.Now() c.mu.Unlock() return } - var e *list.Element - if c.isTTL { - e = c.ll.PushFront(&entryTTL{key: key, value: value, timestamp: time.Now()}) - } else { - e = c.ll.PushFront(&entryLRU{key: key, value: value}) - } + e := c.ll.PushFront(&entry{key: key, value: value, timestamp: time.Now()}) c.cache[key] = e c.mu.Unlock() - if c.MaxEntries != 0 && c.ll.Len() > c.MaxEntries { + if c.maxEntries != 0 && c.ll.Len() > c.maxEntries { c.RemoveOldest() } } @@ -108,8 +101,8 @@ func (c *Cache) Get(key string) (value interface{}, ok bool) { } if e, hit := c.cache[key]; hit { c.ll.MoveToFront(e) - e.Value.(entry).SetTimestamp(time.Now()) - return e.Value.(entry).Value(), true + e.Value.(*entry).timestamp = time.Now() + return e.Value.(*entry).value, true } return } @@ -141,10 +134,10 @@ func (c *Cache) RemoveOldest() { func (c *Cache) removeElement(e *list.Element) { c.ll.Remove(e) - kv := e.Value.(entry) - delete(c.cache, kv.Key()) + kv := e.Value.(*entry) + delete(c.cache, kv.key) if c.OnEvicted != nil { - c.OnEvicted(kv.Key(), kv.Value()) + c.OnEvicted(kv.key, kv.value) } } diff --git a/cache2go/cache_test.go b/cache2go/cache_test.go index cc24e54db..e49b89d16 100644 --- a/cache2go/cache_test.go +++ b/cache2go/cache_test.go @@ -12,7 +12,7 @@ type myStruct struct { } func TestCache(t *testing.T) { - cache := NewTTL(time.Second) + cache := New(0, time.Second) a := &myStruct{data: "mama are mere"} cache.Set("mama", a) b, ok := cache.Get("mama") @@ -22,7 +22,7 @@ func TestCache(t *testing.T) { } func TestCacheExpire(t *testing.T) { - cache := NewTTL(5 * time.Millisecond) + cache := New(0, 5*time.Millisecond) a := &myStruct{data: "mama are mere"} cache.Set("mama", a) b, ok := cache.Get("mama") @@ -37,21 +37,45 @@ func TestCacheExpire(t *testing.T) { } func TestLRU(t *testing.T) { - cache := NewLRU(32) + cache := New(32, 0) for i := 0; i < 40; i++ { cache.Set(fmt.Sprintf("%d", i), i) } if cache.Len() != 32 { t.Error("error dicarding least recently used entry: ", cache.Len()) } - last := cache.ll.Back().Value.(entry).Value().(int) + last := cache.ll.Back().Value.(*entry).value.(int) if last != 8 { t.Error("error dicarding least recently used entry: ", last) } } +func TestLRUandExpire(t *testing.T) { + cache := New(32, 5*time.Millisecond) + for i := 0; i < 40; i++ { + cache.Set(fmt.Sprintf("%d", i), i) + } + if cache.Len() != 32 { + t.Error("error dicarding least recently used entries: ", cache.Len()) + } + last := cache.ll.Back().Value.(*entry).value.(int) + if last != 8 { + t.Error("error dicarding least recently used entry: ", last) + } + time.Sleep(5 * time.Millisecond) + if cache.Len() != 0 { + t.Error("error dicarding expired entries: ", cache.Len()) + } + for i := 0; i < 40; i++ { + cache.Set(fmt.Sprintf("%d", i), i) + } + if cache.Len() != 32 { + t.Error("error dicarding least recently used entries: ", cache.Len()) + } +} + func TestLRUParallel(t *testing.T) { - cache := NewLRU(32) + cache := New(32, 0) wg := sync.WaitGroup{} for i := 0; i < 40; i++ { wg.Add(1) @@ -67,7 +91,7 @@ func TestLRUParallel(t *testing.T) { } func TestFlush(t *testing.T) { - cache := NewTTL(5 * time.Millisecond) + cache := New(0, 5*time.Millisecond) a := &myStruct{data: "mama are mere"} cache.Set("mama", a) time.Sleep(5 * time.Millisecond) @@ -79,7 +103,7 @@ func TestFlush(t *testing.T) { } func TestFlushNoTimeout(t *testing.T) { - cache := NewTTL(5 * time.Millisecond) + cache := New(0, 5*time.Millisecond) a := &myStruct{data: "mama are mere"} cache.Set("mama", a) cache.Flush() @@ -90,7 +114,7 @@ func TestFlushNoTimeout(t *testing.T) { } func TestRemKey(t *testing.T) { - cache := NewLRU(10) + cache := New(10, 0) cache.Set("t11_mm", "test") if t1, ok := cache.Get("t11_mm"); !ok || t1 != "test" { t.Error("Error setting cache") @@ -102,7 +126,7 @@ func TestRemKey(t *testing.T) { } func TestCount(t *testing.T) { - cache := NewTTL(10 * time.Millisecond) + cache := New(0, 10*time.Millisecond) cache.Set("dst_A1", "1") cache.Set("dst_A2", "2") cache.Set("rpf_A3", "3") diff --git a/cache2go/entry.go b/cache2go/entry.go deleted file mode 100644 index d541b47f8..000000000 --- a/cache2go/entry.go +++ /dev/null @@ -1,59 +0,0 @@ -package cache2go - -import "time" - -type entry interface { - Key() string - SetKey(string) - Value() interface{} - SetValue(interface{}) - Timestamp() time.Time - SetTimestamp(time.Time) -} - -type entryLRU struct { - key string - value interface{} -} - -func (lru *entryLRU) Key() string { - return lru.key -} -func (lru *entryLRU) SetKey(k string) { - lru.key = k -} -func (lru *entryLRU) Value() interface{} { - return lru.value -} -func (lru *entryLRU) SetValue(v interface{}) { - lru.value = v -} -func (lru *entryLRU) Timestamp() time.Time { - return time.Time{} -} -func (lru *entryLRU) SetTimestamp(time.Time) {} - -type entryTTL struct { - key string - value interface{} - timestamp time.Time -} - -func (ttl *entryTTL) Key() string { - return ttl.key -} -func (ttl *entryTTL) SetKey(k string) { - ttl.key = k -} -func (ttl *entryTTL) Value() interface{} { - return ttl.value -} -func (ttl *entryTTL) SetValue(v interface{}) { - ttl.value = v -} -func (ttl *entryTTL) Timestamp() time.Time { - return ttl.timestamp -} -func (ttl *entryTTL) SetTimestamp(t time.Time) { - ttl.timestamp = t -} diff --git a/engine/cache.go b/engine/cache.go index b960fd907..d091d7a0f 100644 --- a/engine/cache.go +++ b/engine/cache.go @@ -71,9 +71,9 @@ func CacheCommitTransaction() { case KIND_ADD: CacheSet(item.key, item.value) case KIND_ADP: - CachePush(item.key, item.value.(string)) + CachePush(item.key, item.value.([]string)...) case KIND_POP: - CachePop(item.key, item.value.(string)) + CachePop(item.key, item.value.([]string)...) } } mux.Unlock() @@ -127,15 +127,15 @@ func CachePush(key string, values ...string) { } } -func CachePop(key string, value string) { +func CachePop(key string, values ...string) { if !transactionLock { mux.Lock() defer mux.Unlock() } if !transactionON { - cache.Pop(key, value) + cache.Pop(key, values...) } else { - transactionBuffer = append(transactionBuffer, &transactionItem{key: key, value: value, kind: KIND_POP}) + transactionBuffer = append(transactionBuffer, &transactionItem{key: key, value: values, kind: KIND_POP}) } } diff --git a/engine/cache_store.go b/engine/cache_store.go index 90af3095e..eb7a73ce6 100644 --- a/engine/cache_store.go +++ b/engine/cache_store.go @@ -17,7 +17,7 @@ type cacheStore interface { Put(string, interface{}) Get(string) (interface{}, error) Append(string, ...string) - Pop(string, string) + Pop(string, ...string) Delete(string) DeletePrefix(string) CountEntriesForPrefix(string) int @@ -69,11 +69,13 @@ func (cs cacheDoubleStore) Append(key string, values ...string) { cache.Put(key, elements) } -func (cs cacheDoubleStore) Pop(key string, value string) { +func (cs cacheDoubleStore) Pop(key string, values ...string) { if v, err := cs.Get(key); err == nil { elements, ok := v.(map[string]struct{}) if ok { - delete(elements, value) + for _, value := range values { + delete(elements, value) + } if len(elements) > 0 { cache.Put(key, elements) } else { @@ -164,13 +166,7 @@ type cacheParam struct { } func (ct *cacheParam) createCache() *cache2go.Cache { - if ct.limit > 0 { - return cache2go.NewLRU(ct.limit) - } - if ct.expiration > 0 { - return cache2go.NewTTL(ct.expiration) - } - return cache2go.NewLRU(1000) // sane default + return cache2go.New(ct.limit, ct.expiration) } type cacheLRUTTL map[string]*cache2go.Cache @@ -188,7 +184,7 @@ func (cs cacheLRUTTL) Put(key string, value interface{}) { prefix, key := key[:PREFIX_LEN], key[PREFIX_LEN:] mp, ok := cs[prefix] if !ok { - mp = cache2go.NewLRU(1000) + mp = cache2go.New(1000, 0) cs[prefix] = mp } mp.Set(key, value) @@ -220,11 +216,13 @@ func (cs cacheLRUTTL) Append(key string, values ...string) { cache.Put(key, elements) } -func (cs cacheLRUTTL) Pop(key string, value string) { +func (cs cacheLRUTTL) Pop(key string, values ...string) { if v, err := cs.Get(key); err == nil { elements, ok := v.(map[string]struct{}) if ok { - delete(elements, value) + for _, value := range values { + delete(elements, value) + } if len(elements) > 0 { cache.Put(key, elements) } else { @@ -312,11 +310,13 @@ func (cs cacheSimpleStore) Get(key string) (interface{}, error) { return nil, utils.ErrNotFound } -func (cs cacheSimpleStore) Pop(key string, value string) { +func (cs cacheSimpleStore) Pop(key string, values ...string) { if v, err := cs.Get(key); err == nil { elements, ok := v.(map[string]struct{}) if ok { - delete(elements, value) + for _, value := range values { + delete(elements, value) + } if len(elements) > 0 { cache.Put(key, elements) } else { diff --git a/engine/storage_interface.go b/engine/storage_interface.go index 514f11893..b0f94e52d 100644 --- a/engine/storage_interface.go +++ b/engine/storage_interface.go @@ -47,9 +47,10 @@ type RatingStorage interface { GetRatingProfile(string, bool) (*RatingProfile, error) SetRatingProfile(*RatingProfile) error RemoveRatingProfile(string) error - GetDestinationIDs(string) ([]string, error) - SetDestinationIDs(*Destination) error + GetDestination(string) (*Destination, error) + SetDestination(*Destination) error RemoveDestination(string) error + //GetReverseDestination(string) ([]string, error) GetLCR(string, bool) (*LCR, error) SetLCR(*LCR) error SetCdrStats(*CdrStats) error diff --git a/engine/storage_mongo_datadb.go b/engine/storage_mongo_datadb.go index a0c55ee0f..fce7ca727 100644 --- a/engine/storage_mongo_datadb.go +++ b/engine/storage_mongo_datadb.go @@ -975,8 +975,29 @@ func (ms *MongoStorage) GetDestination(key string) (result *Destination, err err } return } - func (ms *MongoStorage) SetDestination(dest *Destination) (err error) { + result, err := ms.ms.Marshal(dest) + if err != nil { + return err + } + var b bytes.Buffer + w := zlib.NewWriter(&b) + w.Write(result) + w.Close() + session, col := ms.conn(colDst) + defer session.Close() + _, err = col.Upsert(bson.M{"key": dest.Id}, &struct { + Key string + Value []byte + }{Key: dest.Id, Value: b.Bytes()}) + if err == nil && historyScribe != nil { + var response int + historyScribe.Call("HistoryV1.Record", dest.GetHistoryRecord(false), &response) + } + return +} + +/*func (ms *MongoStorage) SetDestination(dest *Destination) (err error) { for _, p := range dest.Prefixes { session, col := ms.conn(colDst) if _, err = col.Upsert(bson.M{"key": p}, &struct { @@ -991,7 +1012,7 @@ func (ms *MongoStorage) SetDestination(dest *Destination) (err error) { historyScribe.Call("HistoryV1.Record", dest.GetHistoryRecord(false), &response) } return -} +}*/ func (ms *MongoStorage) RemoveDestination(destID string) (err error) { return diff --git a/engine/storage_redis.go b/engine/storage_redis.go index 2bf538510..c86abe268 100644 --- a/engine/storage_redis.go +++ b/engine/storage_redis.go @@ -581,6 +581,48 @@ func (rs *RedisStorage) SetLCR(lcr *LCR) (err error) { CacheSet(utils.LCR_PREFIX+lcr.GetId(), lcr) return } +func (rs *RedisStorage) GetDestination(key string) (dest *Destination, err error) { + key = utils.DESTINATION_PREFIX + key + var values []byte + if values, err = rs.db.Cmd("GET", key).Bytes(); len(values) > 0 && err == nil { + b := bytes.NewBuffer(values) + r, err := zlib.NewReader(b) + if err != nil { + return nil, err + } + out, err := ioutil.ReadAll(r) + if err != nil { + return nil, err + } + r.Close() + dest = new(Destination) + err = rs.ms.Unmarshal(out, dest) + // create optimized structure + for _, p := range dest.Prefixes { + CachePush(utils.DESTINATION_PREFIX+p, dest.Id) + } + } else { + return nil, utils.ErrNotFound + } + return +} + +func (rs *RedisStorage) SetDestination(dest *Destination) (err error) { + result, err := rs.ms.Marshal(dest) + if err != nil { + return err + } + var b bytes.Buffer + w := zlib.NewWriter(&b) + w.Write(result) + w.Close() + err = rs.db.Cmd("SET", utils.DESTINATION_PREFIX+dest.Id, b.Bytes()).Err + if err == nil && historyScribe != nil { + response := 0 + go historyScribe.Call("HistoryV1.Record", dest.GetHistoryRecord(false), &response) + } + return +} func (rs *RedisStorage) GetDestinationIDs(prefix string) (ids []string, err error) { prefix = utils.DESTINATION_PREFIX + prefix