mirror of
https://github.com/cgrates/cgrates.git
synced 2026-02-16 05:39:54 +05:00
new composite cache lru/ttl
This commit is contained in:
@@ -11,7 +11,7 @@ type Cache struct {
|
||||
mu sync.RWMutex
|
||||
// MaxEntries is the maximum number of cache entries before
|
||||
// an item is evicted. Zero means no limit.
|
||||
MaxEntries int
|
||||
maxEntries int
|
||||
|
||||
// OnEvicted optionally specificies a callback function to be
|
||||
// executed when an entry is purged from the cache.
|
||||
@@ -20,29 +20,27 @@ type Cache struct {
|
||||
ll *list.List
|
||||
cache map[interface{}]*list.Element
|
||||
expiration time.Duration
|
||||
isTTL bool
|
||||
}
|
||||
|
||||
type entry struct {
|
||||
key string
|
||||
value interface{}
|
||||
timestamp time.Time
|
||||
}
|
||||
|
||||
// New creates a new Cache.
|
||||
// If maxEntries is zero, the cache has no limit and it's assumed
|
||||
// that eviction is done by the caller.
|
||||
func NewLRU(maxEntries int) *Cache {
|
||||
func New(maxEntries int, expire time.Duration) *Cache {
|
||||
c := &Cache{
|
||||
MaxEntries: maxEntries,
|
||||
ll: list.New(),
|
||||
cache: make(map[interface{}]*list.Element),
|
||||
}
|
||||
return c
|
||||
}
|
||||
|
||||
func NewTTL(expire time.Duration) *Cache {
|
||||
c := &Cache{
|
||||
ll: list.New(),
|
||||
cache: make(map[interface{}]*list.Element),
|
||||
maxEntries: maxEntries,
|
||||
expiration: expire,
|
||||
isTTL: true,
|
||||
ll: list.New(),
|
||||
cache: make(map[interface{}]*list.Element),
|
||||
}
|
||||
if c.expiration > 0 {
|
||||
go c.cleanExpired()
|
||||
}
|
||||
go c.cleanExpired()
|
||||
return c
|
||||
}
|
||||
|
||||
@@ -56,7 +54,7 @@ func (c *Cache) cleanExpired() {
|
||||
time.Sleep(c.expiration)
|
||||
continue
|
||||
}
|
||||
en := e.Value.(*entryTTL)
|
||||
en := e.Value.(*entry)
|
||||
if en.timestamp.Add(c.expiration).After(time.Now()) {
|
||||
c.mu.Lock()
|
||||
c.removeElement(e)
|
||||
@@ -78,23 +76,18 @@ func (c *Cache) Set(key string, value interface{}) {
|
||||
if e, ok := c.cache[key]; ok {
|
||||
c.ll.MoveToFront(e)
|
||||
|
||||
en := e.Value.(entry)
|
||||
en.SetValue(value)
|
||||
en.SetTimestamp(time.Now())
|
||||
en := e.Value.(*entry)
|
||||
en.value = value
|
||||
en.timestamp = time.Now()
|
||||
|
||||
c.mu.Unlock()
|
||||
return
|
||||
}
|
||||
var e *list.Element
|
||||
if c.isTTL {
|
||||
e = c.ll.PushFront(&entryTTL{key: key, value: value, timestamp: time.Now()})
|
||||
} else {
|
||||
e = c.ll.PushFront(&entryLRU{key: key, value: value})
|
||||
}
|
||||
e := c.ll.PushFront(&entry{key: key, value: value, timestamp: time.Now()})
|
||||
c.cache[key] = e
|
||||
c.mu.Unlock()
|
||||
|
||||
if c.MaxEntries != 0 && c.ll.Len() > c.MaxEntries {
|
||||
if c.maxEntries != 0 && c.ll.Len() > c.maxEntries {
|
||||
c.RemoveOldest()
|
||||
}
|
||||
}
|
||||
@@ -108,8 +101,8 @@ func (c *Cache) Get(key string) (value interface{}, ok bool) {
|
||||
}
|
||||
if e, hit := c.cache[key]; hit {
|
||||
c.ll.MoveToFront(e)
|
||||
e.Value.(entry).SetTimestamp(time.Now())
|
||||
return e.Value.(entry).Value(), true
|
||||
e.Value.(*entry).timestamp = time.Now()
|
||||
return e.Value.(*entry).value, true
|
||||
}
|
||||
return
|
||||
}
|
||||
@@ -141,10 +134,10 @@ func (c *Cache) RemoveOldest() {
|
||||
|
||||
func (c *Cache) removeElement(e *list.Element) {
|
||||
c.ll.Remove(e)
|
||||
kv := e.Value.(entry)
|
||||
delete(c.cache, kv.Key())
|
||||
kv := e.Value.(*entry)
|
||||
delete(c.cache, kv.key)
|
||||
if c.OnEvicted != nil {
|
||||
c.OnEvicted(kv.Key(), kv.Value())
|
||||
c.OnEvicted(kv.key, kv.value)
|
||||
}
|
||||
}
|
||||
|
||||
|
||||
@@ -12,7 +12,7 @@ type myStruct struct {
|
||||
}
|
||||
|
||||
func TestCache(t *testing.T) {
|
||||
cache := NewTTL(time.Second)
|
||||
cache := New(0, time.Second)
|
||||
a := &myStruct{data: "mama are mere"}
|
||||
cache.Set("mama", a)
|
||||
b, ok := cache.Get("mama")
|
||||
@@ -22,7 +22,7 @@ func TestCache(t *testing.T) {
|
||||
}
|
||||
|
||||
func TestCacheExpire(t *testing.T) {
|
||||
cache := NewTTL(5 * time.Millisecond)
|
||||
cache := New(0, 5*time.Millisecond)
|
||||
a := &myStruct{data: "mama are mere"}
|
||||
cache.Set("mama", a)
|
||||
b, ok := cache.Get("mama")
|
||||
@@ -37,21 +37,45 @@ func TestCacheExpire(t *testing.T) {
|
||||
}
|
||||
|
||||
func TestLRU(t *testing.T) {
|
||||
cache := NewLRU(32)
|
||||
cache := New(32, 0)
|
||||
for i := 0; i < 40; i++ {
|
||||
cache.Set(fmt.Sprintf("%d", i), i)
|
||||
}
|
||||
if cache.Len() != 32 {
|
||||
t.Error("error dicarding least recently used entry: ", cache.Len())
|
||||
}
|
||||
last := cache.ll.Back().Value.(entry).Value().(int)
|
||||
last := cache.ll.Back().Value.(*entry).value.(int)
|
||||
if last != 8 {
|
||||
t.Error("error dicarding least recently used entry: ", last)
|
||||
}
|
||||
}
|
||||
|
||||
func TestLRUandExpire(t *testing.T) {
|
||||
cache := New(32, 5*time.Millisecond)
|
||||
for i := 0; i < 40; i++ {
|
||||
cache.Set(fmt.Sprintf("%d", i), i)
|
||||
}
|
||||
if cache.Len() != 32 {
|
||||
t.Error("error dicarding least recently used entries: ", cache.Len())
|
||||
}
|
||||
last := cache.ll.Back().Value.(*entry).value.(int)
|
||||
if last != 8 {
|
||||
t.Error("error dicarding least recently used entry: ", last)
|
||||
}
|
||||
time.Sleep(5 * time.Millisecond)
|
||||
if cache.Len() != 0 {
|
||||
t.Error("error dicarding expired entries: ", cache.Len())
|
||||
}
|
||||
for i := 0; i < 40; i++ {
|
||||
cache.Set(fmt.Sprintf("%d", i), i)
|
||||
}
|
||||
if cache.Len() != 32 {
|
||||
t.Error("error dicarding least recently used entries: ", cache.Len())
|
||||
}
|
||||
}
|
||||
|
||||
func TestLRUParallel(t *testing.T) {
|
||||
cache := NewLRU(32)
|
||||
cache := New(32, 0)
|
||||
wg := sync.WaitGroup{}
|
||||
for i := 0; i < 40; i++ {
|
||||
wg.Add(1)
|
||||
@@ -67,7 +91,7 @@ func TestLRUParallel(t *testing.T) {
|
||||
}
|
||||
|
||||
func TestFlush(t *testing.T) {
|
||||
cache := NewTTL(5 * time.Millisecond)
|
||||
cache := New(0, 5*time.Millisecond)
|
||||
a := &myStruct{data: "mama are mere"}
|
||||
cache.Set("mama", a)
|
||||
time.Sleep(5 * time.Millisecond)
|
||||
@@ -79,7 +103,7 @@ func TestFlush(t *testing.T) {
|
||||
}
|
||||
|
||||
func TestFlushNoTimeout(t *testing.T) {
|
||||
cache := NewTTL(5 * time.Millisecond)
|
||||
cache := New(0, 5*time.Millisecond)
|
||||
a := &myStruct{data: "mama are mere"}
|
||||
cache.Set("mama", a)
|
||||
cache.Flush()
|
||||
@@ -90,7 +114,7 @@ func TestFlushNoTimeout(t *testing.T) {
|
||||
}
|
||||
|
||||
func TestRemKey(t *testing.T) {
|
||||
cache := NewLRU(10)
|
||||
cache := New(10, 0)
|
||||
cache.Set("t11_mm", "test")
|
||||
if t1, ok := cache.Get("t11_mm"); !ok || t1 != "test" {
|
||||
t.Error("Error setting cache")
|
||||
@@ -102,7 +126,7 @@ func TestRemKey(t *testing.T) {
|
||||
}
|
||||
|
||||
func TestCount(t *testing.T) {
|
||||
cache := NewTTL(10 * time.Millisecond)
|
||||
cache := New(0, 10*time.Millisecond)
|
||||
cache.Set("dst_A1", "1")
|
||||
cache.Set("dst_A2", "2")
|
||||
cache.Set("rpf_A3", "3")
|
||||
|
||||
@@ -1,59 +0,0 @@
|
||||
package cache2go
|
||||
|
||||
import "time"
|
||||
|
||||
type entry interface {
|
||||
Key() string
|
||||
SetKey(string)
|
||||
Value() interface{}
|
||||
SetValue(interface{})
|
||||
Timestamp() time.Time
|
||||
SetTimestamp(time.Time)
|
||||
}
|
||||
|
||||
type entryLRU struct {
|
||||
key string
|
||||
value interface{}
|
||||
}
|
||||
|
||||
func (lru *entryLRU) Key() string {
|
||||
return lru.key
|
||||
}
|
||||
func (lru *entryLRU) SetKey(k string) {
|
||||
lru.key = k
|
||||
}
|
||||
func (lru *entryLRU) Value() interface{} {
|
||||
return lru.value
|
||||
}
|
||||
func (lru *entryLRU) SetValue(v interface{}) {
|
||||
lru.value = v
|
||||
}
|
||||
func (lru *entryLRU) Timestamp() time.Time {
|
||||
return time.Time{}
|
||||
}
|
||||
func (lru *entryLRU) SetTimestamp(time.Time) {}
|
||||
|
||||
type entryTTL struct {
|
||||
key string
|
||||
value interface{}
|
||||
timestamp time.Time
|
||||
}
|
||||
|
||||
func (ttl *entryTTL) Key() string {
|
||||
return ttl.key
|
||||
}
|
||||
func (ttl *entryTTL) SetKey(k string) {
|
||||
ttl.key = k
|
||||
}
|
||||
func (ttl *entryTTL) Value() interface{} {
|
||||
return ttl.value
|
||||
}
|
||||
func (ttl *entryTTL) SetValue(v interface{}) {
|
||||
ttl.value = v
|
||||
}
|
||||
func (ttl *entryTTL) Timestamp() time.Time {
|
||||
return ttl.timestamp
|
||||
}
|
||||
func (ttl *entryTTL) SetTimestamp(t time.Time) {
|
||||
ttl.timestamp = t
|
||||
}
|
||||
@@ -71,9 +71,9 @@ func CacheCommitTransaction() {
|
||||
case KIND_ADD:
|
||||
CacheSet(item.key, item.value)
|
||||
case KIND_ADP:
|
||||
CachePush(item.key, item.value.(string))
|
||||
CachePush(item.key, item.value.([]string)...)
|
||||
case KIND_POP:
|
||||
CachePop(item.key, item.value.(string))
|
||||
CachePop(item.key, item.value.([]string)...)
|
||||
}
|
||||
}
|
||||
mux.Unlock()
|
||||
@@ -127,15 +127,15 @@ func CachePush(key string, values ...string) {
|
||||
}
|
||||
}
|
||||
|
||||
func CachePop(key string, value string) {
|
||||
func CachePop(key string, values ...string) {
|
||||
if !transactionLock {
|
||||
mux.Lock()
|
||||
defer mux.Unlock()
|
||||
}
|
||||
if !transactionON {
|
||||
cache.Pop(key, value)
|
||||
cache.Pop(key, values...)
|
||||
} else {
|
||||
transactionBuffer = append(transactionBuffer, &transactionItem{key: key, value: value, kind: KIND_POP})
|
||||
transactionBuffer = append(transactionBuffer, &transactionItem{key: key, value: values, kind: KIND_POP})
|
||||
}
|
||||
}
|
||||
|
||||
|
||||
@@ -17,7 +17,7 @@ type cacheStore interface {
|
||||
Put(string, interface{})
|
||||
Get(string) (interface{}, error)
|
||||
Append(string, ...string)
|
||||
Pop(string, string)
|
||||
Pop(string, ...string)
|
||||
Delete(string)
|
||||
DeletePrefix(string)
|
||||
CountEntriesForPrefix(string) int
|
||||
@@ -69,11 +69,13 @@ func (cs cacheDoubleStore) Append(key string, values ...string) {
|
||||
cache.Put(key, elements)
|
||||
}
|
||||
|
||||
func (cs cacheDoubleStore) Pop(key string, value string) {
|
||||
func (cs cacheDoubleStore) Pop(key string, values ...string) {
|
||||
if v, err := cs.Get(key); err == nil {
|
||||
elements, ok := v.(map[string]struct{})
|
||||
if ok {
|
||||
delete(elements, value)
|
||||
for _, value := range values {
|
||||
delete(elements, value)
|
||||
}
|
||||
if len(elements) > 0 {
|
||||
cache.Put(key, elements)
|
||||
} else {
|
||||
@@ -164,13 +166,7 @@ type cacheParam struct {
|
||||
}
|
||||
|
||||
func (ct *cacheParam) createCache() *cache2go.Cache {
|
||||
if ct.limit > 0 {
|
||||
return cache2go.NewLRU(ct.limit)
|
||||
}
|
||||
if ct.expiration > 0 {
|
||||
return cache2go.NewTTL(ct.expiration)
|
||||
}
|
||||
return cache2go.NewLRU(1000) // sane default
|
||||
return cache2go.New(ct.limit, ct.expiration)
|
||||
}
|
||||
|
||||
type cacheLRUTTL map[string]*cache2go.Cache
|
||||
@@ -188,7 +184,7 @@ func (cs cacheLRUTTL) Put(key string, value interface{}) {
|
||||
prefix, key := key[:PREFIX_LEN], key[PREFIX_LEN:]
|
||||
mp, ok := cs[prefix]
|
||||
if !ok {
|
||||
mp = cache2go.NewLRU(1000)
|
||||
mp = cache2go.New(1000, 0)
|
||||
cs[prefix] = mp
|
||||
}
|
||||
mp.Set(key, value)
|
||||
@@ -220,11 +216,13 @@ func (cs cacheLRUTTL) Append(key string, values ...string) {
|
||||
cache.Put(key, elements)
|
||||
}
|
||||
|
||||
func (cs cacheLRUTTL) Pop(key string, value string) {
|
||||
func (cs cacheLRUTTL) Pop(key string, values ...string) {
|
||||
if v, err := cs.Get(key); err == nil {
|
||||
elements, ok := v.(map[string]struct{})
|
||||
if ok {
|
||||
delete(elements, value)
|
||||
for _, value := range values {
|
||||
delete(elements, value)
|
||||
}
|
||||
if len(elements) > 0 {
|
||||
cache.Put(key, elements)
|
||||
} else {
|
||||
@@ -312,11 +310,13 @@ func (cs cacheSimpleStore) Get(key string) (interface{}, error) {
|
||||
return nil, utils.ErrNotFound
|
||||
}
|
||||
|
||||
func (cs cacheSimpleStore) Pop(key string, value string) {
|
||||
func (cs cacheSimpleStore) Pop(key string, values ...string) {
|
||||
if v, err := cs.Get(key); err == nil {
|
||||
elements, ok := v.(map[string]struct{})
|
||||
if ok {
|
||||
delete(elements, value)
|
||||
for _, value := range values {
|
||||
delete(elements, value)
|
||||
}
|
||||
if len(elements) > 0 {
|
||||
cache.Put(key, elements)
|
||||
} else {
|
||||
|
||||
@@ -47,9 +47,10 @@ type RatingStorage interface {
|
||||
GetRatingProfile(string, bool) (*RatingProfile, error)
|
||||
SetRatingProfile(*RatingProfile) error
|
||||
RemoveRatingProfile(string) error
|
||||
GetDestinationIDs(string) ([]string, error)
|
||||
SetDestinationIDs(*Destination) error
|
||||
GetDestination(string) (*Destination, error)
|
||||
SetDestination(*Destination) error
|
||||
RemoveDestination(string) error
|
||||
//GetReverseDestination(string) ([]string, error)
|
||||
GetLCR(string, bool) (*LCR, error)
|
||||
SetLCR(*LCR) error
|
||||
SetCdrStats(*CdrStats) error
|
||||
|
||||
@@ -975,8 +975,29 @@ func (ms *MongoStorage) GetDestination(key string) (result *Destination, err err
|
||||
}
|
||||
return
|
||||
}
|
||||
|
||||
func (ms *MongoStorage) SetDestination(dest *Destination) (err error) {
|
||||
result, err := ms.ms.Marshal(dest)
|
||||
if err != nil {
|
||||
return err
|
||||
}
|
||||
var b bytes.Buffer
|
||||
w := zlib.NewWriter(&b)
|
||||
w.Write(result)
|
||||
w.Close()
|
||||
session, col := ms.conn(colDst)
|
||||
defer session.Close()
|
||||
_, err = col.Upsert(bson.M{"key": dest.Id}, &struct {
|
||||
Key string
|
||||
Value []byte
|
||||
}{Key: dest.Id, Value: b.Bytes()})
|
||||
if err == nil && historyScribe != nil {
|
||||
var response int
|
||||
historyScribe.Call("HistoryV1.Record", dest.GetHistoryRecord(false), &response)
|
||||
}
|
||||
return
|
||||
}
|
||||
|
||||
/*func (ms *MongoStorage) SetDestination(dest *Destination) (err error) {
|
||||
for _, p := range dest.Prefixes {
|
||||
session, col := ms.conn(colDst)
|
||||
if _, err = col.Upsert(bson.M{"key": p}, &struct {
|
||||
@@ -991,7 +1012,7 @@ func (ms *MongoStorage) SetDestination(dest *Destination) (err error) {
|
||||
historyScribe.Call("HistoryV1.Record", dest.GetHistoryRecord(false), &response)
|
||||
}
|
||||
return
|
||||
}
|
||||
}*/
|
||||
|
||||
func (ms *MongoStorage) RemoveDestination(destID string) (err error) {
|
||||
return
|
||||
|
||||
@@ -581,6 +581,48 @@ func (rs *RedisStorage) SetLCR(lcr *LCR) (err error) {
|
||||
CacheSet(utils.LCR_PREFIX+lcr.GetId(), lcr)
|
||||
return
|
||||
}
|
||||
func (rs *RedisStorage) GetDestination(key string) (dest *Destination, err error) {
|
||||
key = utils.DESTINATION_PREFIX + key
|
||||
var values []byte
|
||||
if values, err = rs.db.Cmd("GET", key).Bytes(); len(values) > 0 && err == nil {
|
||||
b := bytes.NewBuffer(values)
|
||||
r, err := zlib.NewReader(b)
|
||||
if err != nil {
|
||||
return nil, err
|
||||
}
|
||||
out, err := ioutil.ReadAll(r)
|
||||
if err != nil {
|
||||
return nil, err
|
||||
}
|
||||
r.Close()
|
||||
dest = new(Destination)
|
||||
err = rs.ms.Unmarshal(out, dest)
|
||||
// create optimized structure
|
||||
for _, p := range dest.Prefixes {
|
||||
CachePush(utils.DESTINATION_PREFIX+p, dest.Id)
|
||||
}
|
||||
} else {
|
||||
return nil, utils.ErrNotFound
|
||||
}
|
||||
return
|
||||
}
|
||||
|
||||
func (rs *RedisStorage) SetDestination(dest *Destination) (err error) {
|
||||
result, err := rs.ms.Marshal(dest)
|
||||
if err != nil {
|
||||
return err
|
||||
}
|
||||
var b bytes.Buffer
|
||||
w := zlib.NewWriter(&b)
|
||||
w.Write(result)
|
||||
w.Close()
|
||||
err = rs.db.Cmd("SET", utils.DESTINATION_PREFIX+dest.Id, b.Bytes()).Err
|
||||
if err == nil && historyScribe != nil {
|
||||
response := 0
|
||||
go historyScribe.Call("HistoryV1.Record", dest.GetHistoryRecord(false), &response)
|
||||
}
|
||||
return
|
||||
}
|
||||
|
||||
func (rs *RedisStorage) GetDestinationIDs(prefix string) (ids []string, err error) {
|
||||
prefix = utils.DESTINATION_PREFIX + prefix
|
||||
|
||||
Reference in New Issue
Block a user