diff --git a/cache2go/cache.go b/cache2go/cache.go index 582f0a1d7..44377499d 100644 --- a/cache2go/cache.go +++ b/cache2go/cache.go @@ -8,105 +8,21 @@ import ( "time" ) -type expiringCacheEntry interface { - XCache(key string, expire time.Duration, value expiringCacheEntry) - timer() *time.Timer - age() time.Duration - KeepAlive() -} - -// Structure that must be embeded in the objectst that must be cached with expiration. -// If the expiration is not needed this can be ignored -type XEntry struct { - sync.Mutex - key string - keepAlive bool - expireDuration time.Duration - timestamp time.Time - t *time.Timer -} +const ( + PREFIX_LEN = 4 +) type timestampedValue struct { timestamp time.Time value interface{} } -const ( - PREFIX_LEN = 4 -) - var ( - xcache = make(map[string]expiringCacheEntry) - xMux sync.RWMutex cache = make(map[string]timestampedValue) mux sync.RWMutex - cMux sync.Mutex counters = make(map[string]int64) ) -// The main function to cache with expiration -func (xe *XEntry) XCache(key string, expire time.Duration, value expiringCacheEntry) { - xe.keepAlive = true - xe.key = key - xe.expireDuration = expire - xe.timestamp = time.Now() - xMux.Lock() - if _, ok := xcache[key]; !ok { - // only count if the key is not already there - count(key) - } - xcache[key] = value - xMux.Unlock() - go xe.expire() -} - -// The internal mechanism for expiartion -func (xe *XEntry) expire() { - for xe.keepAlive { - xe.Lock() - xe.keepAlive = false - xe.Unlock() - xe.t = time.NewTimer(xe.expireDuration) - <-xe.t.C - if !xe.keepAlive { - xMux.Lock() - if _, ok := xcache[xe.key]; ok { - delete(xcache, xe.key) - descount(xe.key) - } - xMux.Unlock() - } - } -} - -// Getter for the timer -func (xe *XEntry) timer() *time.Timer { - return xe.t -} - -func (xe *XEntry) age() time.Duration { - return time.Since(xe.timestamp) - -} - -// Mark entry to be kept another expirationDuration period -func (xe *XEntry) KeepAlive() { - xe.Lock() - defer xe.Unlock() - xe.keepAlive = true -} - -// Get an entry from the expiration cache and mark it for keeping alive -func GetXCached(key string) (ece expiringCacheEntry, err error) { - xMux.RLock() - defer xMux.RUnlock() - if r, ok := xcache[key]; ok { - r.KeepAlive() - return r, nil - } - return nil, errors.New("not found") -} - // The function to be used to cache a key/value pair when expiration is not needed func Cache(key string, value interface{}) { mux.Lock() @@ -134,96 +50,52 @@ func GetKeyAge(key string) (time.Duration, error) { if r, ok := cache[key]; ok { return time.Since(r.timestamp), nil } - xMux.RLock() - defer xMux.RUnlock() - if r, ok := xcache[key]; ok { - return r.age(), nil - } return 0, errors.New("not found") } func RemKey(key string) { mux.Lock() + defer mux.Unlock() if _, ok := cache[key]; ok { delete(cache, key) descount(key) } - mux.Unlock() - xMux.Lock() - if r, ok := xcache[key]; ok { - if r.timer() != nil { - r.timer().Stop() - } - } - if _, ok := xcache[key]; ok { - delete(xcache, key) - descount(key) - } - xMux.Unlock() } func RemPrefixKey(prefix string) { mux.Lock() + defer mux.Unlock() for key, _ := range cache { if strings.HasPrefix(key, prefix) { delete(cache, key) descount(key) } } - mux.Unlock() - xMux.Lock() - for key, _ := range xcache { - if strings.HasPrefix(key, prefix) { - if r, ok := xcache[key]; ok { - if r.timer() != nil { - r.timer().Stop() - } - } - delete(xcache, key) - descount(key) - } - } - xMux.Unlock() } func GetAllEntries(prefix string) map[string]interface{} { mux.Lock() + defer mux.Unlock() result := make(map[string]interface{}) for key, timestampedValue := range cache { if strings.HasPrefix(key, prefix) { result[key] = timestampedValue.value } } - mux.Unlock() - xMux.Lock() - for key, value := range xcache { - if strings.HasPrefix(key, prefix) { - result[key] = value - } - } - xMux.Unlock() return result } // Delete all keys from cache func Flush() { mux.Lock() + defer mux.Unlock() cache = make(map[string]timestampedValue) - mux.Unlock() - xMux.Lock() - for _, v := range xcache { - if v.timer() != nil { - v.timer().Stop() - } - } - xcache = make(map[string]expiringCacheEntry) - xMux.Unlock() - cMux.Lock() counters = make(map[string]int64) - cMux.Unlock() } func CountEntries(prefix string) (result int64) { + mux.RLock() + defer mux.RUnlock() if _, ok := counters[prefix]; ok { return counters[prefix] } @@ -235,8 +107,6 @@ func count(key string) { if len(key) < PREFIX_LEN { return } - cMux.Lock() - defer cMux.Unlock() prefix := key[:PREFIX_LEN] if _, ok := counters[prefix]; ok { // increase the value @@ -251,8 +121,6 @@ func descount(key string) { if len(key) < PREFIX_LEN { return } - cMux.Lock() - defer cMux.Unlock() prefix := key[:PREFIX_LEN] if value, ok := counters[prefix]; ok && value > 0 { counters[prefix] -= 1 @@ -269,14 +137,3 @@ func GetEntriesKeys(prefix string) (keys []string) { } return } - -func XGetEntriesKeys(prefix string) (keys []string) { - xMux.RLock() - defer xMux.RUnlock() - for key, _ := range xcache { - if strings.HasPrefix(key, prefix) { - keys = append(keys, key) - } - } - return -} diff --git a/cache2go/cache_test.go b/cache2go/cache_test.go index d5b8d597e..7ea119196 100644 --- a/cache2go/cache_test.go +++ b/cache2go/cache_test.go @@ -1,78 +1,6 @@ package cache2go -import ( - "testing" - "time" -) - -type myStruct struct { - XEntry - data string -} - -func TestCache(t *testing.T) { - a := &myStruct{data: "mama are mere"} - a.XCache("mama", 1*time.Second, a) - b, err := GetXCached("mama") - if err != nil || b == nil || b != a { - t.Error("Error retriving data from cache", err) - } -} - -func TestCacheExpire(t *testing.T) { - a := &myStruct{data: "mama are mere"} - a.XCache("mama", 1*time.Second, a) - b, err := GetXCached("mama") - if err != nil || b == nil || b.(*myStruct).data != "mama are mere" { - t.Error("Error retriving data from cache", err) - } - time.Sleep(1001 * time.Millisecond) - b, err = GetXCached("mama") - if err == nil || b != nil { - t.Error("Error expiring data") - } -} - -func TestCacheKeepAlive(t *testing.T) { - a := &myStruct{data: "mama are mere"} - a.XCache("mama", 1*time.Second, a) - b, err := GetXCached("mama") - if err != nil || b == nil || b.(*myStruct).data != "mama are mere" { - t.Error("Error retriving data from cache", err) - } - time.Sleep(500 * time.Millisecond) - b.KeepAlive() - time.Sleep(501 * time.Millisecond) - if err != nil { - t.Error("Error keeping cached data alive", err) - } - time.Sleep(1000 * time.Millisecond) - b, err = GetXCached("mama") - if err == nil || b != nil { - t.Error("Error expiring data") - } -} - -func TestFlush(t *testing.T) { - a := &myStruct{data: "mama are mere"} - a.XCache("mama", 10*time.Second, a) - time.Sleep(1000 * time.Millisecond) - Flush() - b, err := GetXCached("mama") - if err == nil || b != nil { - t.Error("Error expiring data") - } -} - -func TestFlushNoTimout(t *testing.T) { - a := &myStruct{data: "mama are mere"} - a.XCache("mama", 10*time.Second, a) - Flush() - b, err := GetXCached("mama") - if err == nil || b != nil { - t.Error("Error expiring data") - } -} +import "testing" func TestRemKey(t *testing.T) { Cache("t11_mm", "test") @@ -85,39 +13,6 @@ func TestRemKey(t *testing.T) { } } -func TestXRemKey(t *testing.T) { - a := &myStruct{data: "mama are mere"} - a.XCache("mama", 10*time.Second, a) - if t1, err := GetXCached("mama"); err != nil || t1 != a { - t.Error("Error setting xcache") - } - RemKey("mama") - if t1, err := GetXCached("mama"); err == nil || t1 == a { - t.Error("Error removing xcached key: ", err, t1) - } -} - -/* -These tests sometimes fails on drone.io -func TestGetKeyAge(t *testing.T) { - Cache("t1", "test") - d, err := GetKeyAge("t1") - if err != nil || d > time.Millisecond || d < time.Nanosecond { - t.Error("Error getting cache key age: ", d) - } -} - - -func TestXGetKeyAge(t *testing.T) { - a := &myStruct{data: "mama are mere"} - a.XCache("t1", 10*time.Second, a) - d, err := GetXKeyAge("t1") - if err != nil || d > time.Millisecond || d < time.Nanosecond { - t.Error("Error getting cache key age: ", d) - } -} -*/ - func TestRemPrefixKey(t *testing.T) { Cache("x_t1", "test") Cache("y_t1", "test") @@ -129,19 +24,6 @@ func TestRemPrefixKey(t *testing.T) { } } -func TestXRemPrefixKey(t *testing.T) { - a := &myStruct{data: "mama are mere"} - a.XCache("x_t1", 10*time.Second, a) - a.XCache("y_t1", 10*time.Second, a) - - RemPrefixKey("x_") - _, errX := GetXCached("x_t1") - _, errY := GetXCached("y_t1") - if errX == nil || errY != nil { - t.Error("Error removing prefix: ", errX, errY) - } -} - func TestCount(t *testing.T) { Cache("dst_A1", "1") Cache("dst_A2", "2") diff --git a/engine/loader_csv.go b/engine/loader_csv.go index 1f38567ad..5d4a5fba5 100644 --- a/engine/loader_csv.go +++ b/engine/loader_csv.go @@ -186,7 +186,7 @@ func (csvr *CSVReader) WriteToDatabase(flush, verbose bool) (err error) { return errors.New("No database connection!") } if flush { - dataStorage.(Storage).Flush() + dataStorage.Flush() } if verbose { log.Print("Destinations:") diff --git a/engine/loader_db.go b/engine/loader_db.go index 0f610422a..d4b96dbfd 100644 --- a/engine/loader_db.go +++ b/engine/loader_db.go @@ -133,7 +133,7 @@ func (dbr *DbReader) ShowStatistics() { func (dbr *DbReader) WriteToDatabase(flush, verbose bool) (err error) { storage := dbr.dataDb if flush { - storage.(Storage).Flush() + storage.Flush() } if verbose { log.Print("Destinations")