mirror of
https://github.com/cgrates/cgrates.git
synced 2026-02-16 21:59:53 +05:00
simplified cache
This commit is contained in:
@@ -8,105 +8,21 @@ import (
|
||||
"time"
|
||||
)
|
||||
|
||||
type expiringCacheEntry interface {
|
||||
XCache(key string, expire time.Duration, value expiringCacheEntry)
|
||||
timer() *time.Timer
|
||||
age() time.Duration
|
||||
KeepAlive()
|
||||
}
|
||||
|
||||
// Structure that must be embeded in the objectst that must be cached with expiration.
|
||||
// If the expiration is not needed this can be ignored
|
||||
type XEntry struct {
|
||||
sync.Mutex
|
||||
key string
|
||||
keepAlive bool
|
||||
expireDuration time.Duration
|
||||
timestamp time.Time
|
||||
t *time.Timer
|
||||
}
|
||||
const (
|
||||
PREFIX_LEN = 4
|
||||
)
|
||||
|
||||
type timestampedValue struct {
|
||||
timestamp time.Time
|
||||
value interface{}
|
||||
}
|
||||
|
||||
const (
|
||||
PREFIX_LEN = 4
|
||||
)
|
||||
|
||||
var (
|
||||
xcache = make(map[string]expiringCacheEntry)
|
||||
xMux sync.RWMutex
|
||||
cache = make(map[string]timestampedValue)
|
||||
mux sync.RWMutex
|
||||
cMux sync.Mutex
|
||||
counters = make(map[string]int64)
|
||||
)
|
||||
|
||||
// The main function to cache with expiration
|
||||
func (xe *XEntry) XCache(key string, expire time.Duration, value expiringCacheEntry) {
|
||||
xe.keepAlive = true
|
||||
xe.key = key
|
||||
xe.expireDuration = expire
|
||||
xe.timestamp = time.Now()
|
||||
xMux.Lock()
|
||||
if _, ok := xcache[key]; !ok {
|
||||
// only count if the key is not already there
|
||||
count(key)
|
||||
}
|
||||
xcache[key] = value
|
||||
xMux.Unlock()
|
||||
go xe.expire()
|
||||
}
|
||||
|
||||
// The internal mechanism for expiartion
|
||||
func (xe *XEntry) expire() {
|
||||
for xe.keepAlive {
|
||||
xe.Lock()
|
||||
xe.keepAlive = false
|
||||
xe.Unlock()
|
||||
xe.t = time.NewTimer(xe.expireDuration)
|
||||
<-xe.t.C
|
||||
if !xe.keepAlive {
|
||||
xMux.Lock()
|
||||
if _, ok := xcache[xe.key]; ok {
|
||||
delete(xcache, xe.key)
|
||||
descount(xe.key)
|
||||
}
|
||||
xMux.Unlock()
|
||||
}
|
||||
}
|
||||
}
|
||||
|
||||
// Getter for the timer
|
||||
func (xe *XEntry) timer() *time.Timer {
|
||||
return xe.t
|
||||
}
|
||||
|
||||
func (xe *XEntry) age() time.Duration {
|
||||
return time.Since(xe.timestamp)
|
||||
|
||||
}
|
||||
|
||||
// Mark entry to be kept another expirationDuration period
|
||||
func (xe *XEntry) KeepAlive() {
|
||||
xe.Lock()
|
||||
defer xe.Unlock()
|
||||
xe.keepAlive = true
|
||||
}
|
||||
|
||||
// Get an entry from the expiration cache and mark it for keeping alive
|
||||
func GetXCached(key string) (ece expiringCacheEntry, err error) {
|
||||
xMux.RLock()
|
||||
defer xMux.RUnlock()
|
||||
if r, ok := xcache[key]; ok {
|
||||
r.KeepAlive()
|
||||
return r, nil
|
||||
}
|
||||
return nil, errors.New("not found")
|
||||
}
|
||||
|
||||
// The function to be used to cache a key/value pair when expiration is not needed
|
||||
func Cache(key string, value interface{}) {
|
||||
mux.Lock()
|
||||
@@ -134,96 +50,52 @@ func GetKeyAge(key string) (time.Duration, error) {
|
||||
if r, ok := cache[key]; ok {
|
||||
return time.Since(r.timestamp), nil
|
||||
}
|
||||
xMux.RLock()
|
||||
defer xMux.RUnlock()
|
||||
if r, ok := xcache[key]; ok {
|
||||
return r.age(), nil
|
||||
}
|
||||
return 0, errors.New("not found")
|
||||
}
|
||||
|
||||
func RemKey(key string) {
|
||||
mux.Lock()
|
||||
defer mux.Unlock()
|
||||
if _, ok := cache[key]; ok {
|
||||
delete(cache, key)
|
||||
descount(key)
|
||||
}
|
||||
mux.Unlock()
|
||||
xMux.Lock()
|
||||
if r, ok := xcache[key]; ok {
|
||||
if r.timer() != nil {
|
||||
r.timer().Stop()
|
||||
}
|
||||
}
|
||||
if _, ok := xcache[key]; ok {
|
||||
delete(xcache, key)
|
||||
descount(key)
|
||||
}
|
||||
xMux.Unlock()
|
||||
}
|
||||
|
||||
func RemPrefixKey(prefix string) {
|
||||
mux.Lock()
|
||||
defer mux.Unlock()
|
||||
for key, _ := range cache {
|
||||
if strings.HasPrefix(key, prefix) {
|
||||
delete(cache, key)
|
||||
descount(key)
|
||||
}
|
||||
}
|
||||
mux.Unlock()
|
||||
xMux.Lock()
|
||||
for key, _ := range xcache {
|
||||
if strings.HasPrefix(key, prefix) {
|
||||
if r, ok := xcache[key]; ok {
|
||||
if r.timer() != nil {
|
||||
r.timer().Stop()
|
||||
}
|
||||
}
|
||||
delete(xcache, key)
|
||||
descount(key)
|
||||
}
|
||||
}
|
||||
xMux.Unlock()
|
||||
}
|
||||
|
||||
func GetAllEntries(prefix string) map[string]interface{} {
|
||||
mux.Lock()
|
||||
defer mux.Unlock()
|
||||
result := make(map[string]interface{})
|
||||
for key, timestampedValue := range cache {
|
||||
if strings.HasPrefix(key, prefix) {
|
||||
result[key] = timestampedValue.value
|
||||
}
|
||||
}
|
||||
mux.Unlock()
|
||||
xMux.Lock()
|
||||
for key, value := range xcache {
|
||||
if strings.HasPrefix(key, prefix) {
|
||||
result[key] = value
|
||||
}
|
||||
}
|
||||
xMux.Unlock()
|
||||
return result
|
||||
}
|
||||
|
||||
// Delete all keys from cache
|
||||
func Flush() {
|
||||
mux.Lock()
|
||||
defer mux.Unlock()
|
||||
cache = make(map[string]timestampedValue)
|
||||
mux.Unlock()
|
||||
xMux.Lock()
|
||||
for _, v := range xcache {
|
||||
if v.timer() != nil {
|
||||
v.timer().Stop()
|
||||
}
|
||||
}
|
||||
xcache = make(map[string]expiringCacheEntry)
|
||||
xMux.Unlock()
|
||||
cMux.Lock()
|
||||
counters = make(map[string]int64)
|
||||
cMux.Unlock()
|
||||
}
|
||||
|
||||
func CountEntries(prefix string) (result int64) {
|
||||
mux.RLock()
|
||||
defer mux.RUnlock()
|
||||
if _, ok := counters[prefix]; ok {
|
||||
return counters[prefix]
|
||||
}
|
||||
@@ -235,8 +107,6 @@ func count(key string) {
|
||||
if len(key) < PREFIX_LEN {
|
||||
return
|
||||
}
|
||||
cMux.Lock()
|
||||
defer cMux.Unlock()
|
||||
prefix := key[:PREFIX_LEN]
|
||||
if _, ok := counters[prefix]; ok {
|
||||
// increase the value
|
||||
@@ -251,8 +121,6 @@ func descount(key string) {
|
||||
if len(key) < PREFIX_LEN {
|
||||
return
|
||||
}
|
||||
cMux.Lock()
|
||||
defer cMux.Unlock()
|
||||
prefix := key[:PREFIX_LEN]
|
||||
if value, ok := counters[prefix]; ok && value > 0 {
|
||||
counters[prefix] -= 1
|
||||
@@ -269,14 +137,3 @@ func GetEntriesKeys(prefix string) (keys []string) {
|
||||
}
|
||||
return
|
||||
}
|
||||
|
||||
func XGetEntriesKeys(prefix string) (keys []string) {
|
||||
xMux.RLock()
|
||||
defer xMux.RUnlock()
|
||||
for key, _ := range xcache {
|
||||
if strings.HasPrefix(key, prefix) {
|
||||
keys = append(keys, key)
|
||||
}
|
||||
}
|
||||
return
|
||||
}
|
||||
|
||||
@@ -1,78 +1,6 @@
|
||||
package cache2go
|
||||
|
||||
import (
|
||||
"testing"
|
||||
"time"
|
||||
)
|
||||
|
||||
type myStruct struct {
|
||||
XEntry
|
||||
data string
|
||||
}
|
||||
|
||||
func TestCache(t *testing.T) {
|
||||
a := &myStruct{data: "mama are mere"}
|
||||
a.XCache("mama", 1*time.Second, a)
|
||||
b, err := GetXCached("mama")
|
||||
if err != nil || b == nil || b != a {
|
||||
t.Error("Error retriving data from cache", err)
|
||||
}
|
||||
}
|
||||
|
||||
func TestCacheExpire(t *testing.T) {
|
||||
a := &myStruct{data: "mama are mere"}
|
||||
a.XCache("mama", 1*time.Second, a)
|
||||
b, err := GetXCached("mama")
|
||||
if err != nil || b == nil || b.(*myStruct).data != "mama are mere" {
|
||||
t.Error("Error retriving data from cache", err)
|
||||
}
|
||||
time.Sleep(1001 * time.Millisecond)
|
||||
b, err = GetXCached("mama")
|
||||
if err == nil || b != nil {
|
||||
t.Error("Error expiring data")
|
||||
}
|
||||
}
|
||||
|
||||
func TestCacheKeepAlive(t *testing.T) {
|
||||
a := &myStruct{data: "mama are mere"}
|
||||
a.XCache("mama", 1*time.Second, a)
|
||||
b, err := GetXCached("mama")
|
||||
if err != nil || b == nil || b.(*myStruct).data != "mama are mere" {
|
||||
t.Error("Error retriving data from cache", err)
|
||||
}
|
||||
time.Sleep(500 * time.Millisecond)
|
||||
b.KeepAlive()
|
||||
time.Sleep(501 * time.Millisecond)
|
||||
if err != nil {
|
||||
t.Error("Error keeping cached data alive", err)
|
||||
}
|
||||
time.Sleep(1000 * time.Millisecond)
|
||||
b, err = GetXCached("mama")
|
||||
if err == nil || b != nil {
|
||||
t.Error("Error expiring data")
|
||||
}
|
||||
}
|
||||
|
||||
func TestFlush(t *testing.T) {
|
||||
a := &myStruct{data: "mama are mere"}
|
||||
a.XCache("mama", 10*time.Second, a)
|
||||
time.Sleep(1000 * time.Millisecond)
|
||||
Flush()
|
||||
b, err := GetXCached("mama")
|
||||
if err == nil || b != nil {
|
||||
t.Error("Error expiring data")
|
||||
}
|
||||
}
|
||||
|
||||
func TestFlushNoTimout(t *testing.T) {
|
||||
a := &myStruct{data: "mama are mere"}
|
||||
a.XCache("mama", 10*time.Second, a)
|
||||
Flush()
|
||||
b, err := GetXCached("mama")
|
||||
if err == nil || b != nil {
|
||||
t.Error("Error expiring data")
|
||||
}
|
||||
}
|
||||
import "testing"
|
||||
|
||||
func TestRemKey(t *testing.T) {
|
||||
Cache("t11_mm", "test")
|
||||
@@ -85,39 +13,6 @@ func TestRemKey(t *testing.T) {
|
||||
}
|
||||
}
|
||||
|
||||
func TestXRemKey(t *testing.T) {
|
||||
a := &myStruct{data: "mama are mere"}
|
||||
a.XCache("mama", 10*time.Second, a)
|
||||
if t1, err := GetXCached("mama"); err != nil || t1 != a {
|
||||
t.Error("Error setting xcache")
|
||||
}
|
||||
RemKey("mama")
|
||||
if t1, err := GetXCached("mama"); err == nil || t1 == a {
|
||||
t.Error("Error removing xcached key: ", err, t1)
|
||||
}
|
||||
}
|
||||
|
||||
/*
|
||||
These tests sometimes fails on drone.io
|
||||
func TestGetKeyAge(t *testing.T) {
|
||||
Cache("t1", "test")
|
||||
d, err := GetKeyAge("t1")
|
||||
if err != nil || d > time.Millisecond || d < time.Nanosecond {
|
||||
t.Error("Error getting cache key age: ", d)
|
||||
}
|
||||
}
|
||||
|
||||
|
||||
func TestXGetKeyAge(t *testing.T) {
|
||||
a := &myStruct{data: "mama are mere"}
|
||||
a.XCache("t1", 10*time.Second, a)
|
||||
d, err := GetXKeyAge("t1")
|
||||
if err != nil || d > time.Millisecond || d < time.Nanosecond {
|
||||
t.Error("Error getting cache key age: ", d)
|
||||
}
|
||||
}
|
||||
*/
|
||||
|
||||
func TestRemPrefixKey(t *testing.T) {
|
||||
Cache("x_t1", "test")
|
||||
Cache("y_t1", "test")
|
||||
@@ -129,19 +24,6 @@ func TestRemPrefixKey(t *testing.T) {
|
||||
}
|
||||
}
|
||||
|
||||
func TestXRemPrefixKey(t *testing.T) {
|
||||
a := &myStruct{data: "mama are mere"}
|
||||
a.XCache("x_t1", 10*time.Second, a)
|
||||
a.XCache("y_t1", 10*time.Second, a)
|
||||
|
||||
RemPrefixKey("x_")
|
||||
_, errX := GetXCached("x_t1")
|
||||
_, errY := GetXCached("y_t1")
|
||||
if errX == nil || errY != nil {
|
||||
t.Error("Error removing prefix: ", errX, errY)
|
||||
}
|
||||
}
|
||||
|
||||
func TestCount(t *testing.T) {
|
||||
Cache("dst_A1", "1")
|
||||
Cache("dst_A2", "2")
|
||||
|
||||
@@ -186,7 +186,7 @@ func (csvr *CSVReader) WriteToDatabase(flush, verbose bool) (err error) {
|
||||
return errors.New("No database connection!")
|
||||
}
|
||||
if flush {
|
||||
dataStorage.(Storage).Flush()
|
||||
dataStorage.Flush()
|
||||
}
|
||||
if verbose {
|
||||
log.Print("Destinations:")
|
||||
|
||||
@@ -133,7 +133,7 @@ func (dbr *DbReader) ShowStatistics() {
|
||||
func (dbr *DbReader) WriteToDatabase(flush, verbose bool) (err error) {
|
||||
storage := dbr.dataDb
|
||||
if flush {
|
||||
storage.(Storage).Flush()
|
||||
storage.Flush()
|
||||
}
|
||||
if verbose {
|
||||
log.Print("Destinations")
|
||||
|
||||
Reference in New Issue
Block a user