From 07a62f4ca34d551c9c801400abb05c3c5fce9e3c Mon Sep 17 00:00:00 2001 From: DanB Date: Sat, 12 May 2018 13:49:36 +0200 Subject: [PATCH] Simplified guardian with channels --- guardian/guardian.go | 139 ++++++++++++++------------------------ guardian/guardian_test.go | 49 +++++++------- 2 files changed, 73 insertions(+), 115 deletions(-) diff --git a/guardian/guardian.go b/guardian/guardian.go index f3ee3033d..f1b417d54 100644 --- a/guardian/guardian.go +++ b/guardian/guardian.go @@ -19,7 +19,6 @@ along with this program. If not, see package guardian import ( - "fmt" "sync" "time" ) @@ -27,90 +26,56 @@ import ( // global package variable var Guardian = &GuardianLocker{locksMap: make(map[string]*itemLock)} -func newItemLock(keyID string) (il *itemLock) { - il = &itemLock{keyID: keyID} - il.lock() // need to return it already locked so we don't have concurrency on creation/unlock - return -} - -// itemLock represents one lock with key autodestroy type itemLock struct { - keyID string // store it so we know what to destroy - cnt int64 - cntLck sync.Mutex // protect the counter - lk sync.Mutex // real lock -} - -// lock() executes combined lock with increasing counter -func (il *itemLock) lock() { - il.cntLck.Lock() - il.cnt += 1 - il.cntLck.Unlock() - il.lk.Lock() -} - -// unlock() executes combined lock with autoremoving lock from Guardian -func (il *itemLock) unlock() { - il.cntLck.Lock() - if il.cnt < 1 { // already unlocked - fmt.Sprintf(" itemLock with id: %s with counter smaller than 0", il.keyID) - il.cntLck.Unlock() - return - } - il.cnt -= 1 - if il.cnt == 0 { // last lock in the queue - Guardian.Lock() - delete(Guardian.locksMap, il.keyID) - Guardian.Unlock() - } - il.lk.Unlock() - il.cntLck.Unlock() -} - -type itemLocks []*itemLock - -func (ils itemLocks) lock() { - for _, itmLock := range ils { - itmLock.lock() - } -} - -func (ils itemLocks) unlock() { - for _, itmLock := range ils { - itmLock.unlock() - } + lk chan struct{} + cnt int64 } // GuardianLocker is an optimized locking system per locking key type GuardianLocker struct { - locksMap map[string]*itemLock - sync.RWMutex // protects the maps + locksMap map[string]*itemLock + sync.Mutex // protects the map } -// lockItems locks a set of lockIDs -// returning the lock structs so they can be later unlocked -func (guard *GuardianLocker) lockItems(lockIDs []string) (itmLocks itemLocks) { - guard.Lock() - var toLockItms itemLocks - for _, lockID := range lockIDs { - itmLock, exists := guard.locksMap[lockID] - if !exists { - itmLock = newItemLock(lockID) - guard.locksMap[lockID] = itmLock - } else { - toLockItms = append(toLockItms, itmLock) - } - itmLocks = append(itmLocks, itmLock) +func (gl *GuardianLocker) lockItem(itmID string) { + gl.Lock() + itmLock, exists := gl.locksMap[itmID] + if !exists { + itmLock = &itemLock{lk: make(chan struct{}, 1)} + gl.locksMap[itmID] = itmLock + itmLock.lk <- struct{}{} } - guard.Unlock() - toLockItms.lock() - return + itmLock.cnt++ + select { + case <-itmLock.lk: + gl.Unlock() + return + default: // move further so we can unlock + } + gl.Unlock() + <-itmLock.lk +} + +func (gl *GuardianLocker) unlockItem(itmID string) { + gl.Lock() + itmLock, exists := gl.locksMap[itmID] + if !exists { + gl.Unlock() + return + } + itmLock.cnt-- + if itmLock.cnt == 0 { + delete(gl.locksMap, itmID) + } + gl.Unlock() + itmLock.lk <- struct{}{} } // Guard executes the handler between locks -func (guard *GuardianLocker) Guard(handler func() (interface{}, error), timeout time.Duration, lockIDs ...string) (reply interface{}, err error) { - itmLocks := guard.lockItems(lockIDs) - +func (gl *GuardianLocker) Guard(handler func() (interface{}, error), timeout time.Duration, lockIDs ...string) (reply interface{}, err error) { + for _, lockID := range lockIDs { + gl.lockItem(lockID) + } rplyChan := make(chan interface{}) errChan := make(chan error) go func(rplyChan chan interface{}, errChan chan error) { @@ -121,7 +86,6 @@ func (guard *GuardianLocker) Guard(handler func() (interface{}, error), timeout rplyChan <- rply } }(rplyChan, errChan) - if timeout > 0 { // wait with timeout select { case err = <-errChan: @@ -134,35 +98,30 @@ func (guard *GuardianLocker) Guard(handler func() (interface{}, error), timeout case reply = <-rplyChan: } } - - itmLocks.unlock() + for _, lockID := range lockIDs { + gl.unlockItem(lockID) + } return } // GuardTimed aquires a lock for duration -func (guard *GuardianLocker) GuardIDs(timeout time.Duration, lockIDs ...string) { - guard.lockItems(lockIDs) +func (gl *GuardianLocker) GuardIDs(timeout time.Duration, lockIDs ...string) { + for _, lockID := range lockIDs { + gl.lockItem(lockID) + } if timeout != 0 { go func(timeout time.Duration, lockIDs ...string) { time.Sleep(timeout) - guard.UnguardIDs(lockIDs...) + gl.UnguardIDs(lockIDs...) }(timeout, lockIDs...) } return } // UnguardTimed attempts to unlock a set of locks based on their locksUUID -func (guard *GuardianLocker) UnguardIDs(lockIDs ...string) { - var itmLocks itemLocks - guard.RLock() +func (gl *GuardianLocker) UnguardIDs(lockIDs ...string) { for _, lockID := range lockIDs { - var itmLock *itemLock - itmLock, exists := Guardian.locksMap[lockID] - if exists { - itmLocks = append(itmLocks, itmLock) - } + gl.unlockItem(lockID) } - guard.RUnlock() - itmLocks.unlock() return } diff --git a/guardian/guardian_test.go b/guardian/guardian_test.go index 5a52f9a84..093267d00 100644 --- a/guardian/guardian_test.go +++ b/guardian/guardian_test.go @@ -19,7 +19,6 @@ package guardian import ( "sync" - "sync/atomic" "testing" "time" ) @@ -49,13 +48,13 @@ func TestGuardianMultipleKeys(t *testing.T) { if execTime := time.Now().Sub(tStart); execTime < mustExecDur || execTime > mustExecDur+time.Duration(20*time.Millisecond) { t.Errorf("Execution took: %v", execTime) } - Guardian.RLock() + Guardian.Lock() for _, key := range keys { if _, hasKey := Guardian.locksMap[key]; hasKey { - t.Error("Possible memleak") + t.Errorf("Possible memleak for key: %s", key) } } - Guardian.RUnlock() + Guardian.Unlock() } func TestGuardianTimeout(t *testing.T) { @@ -77,13 +76,13 @@ func TestGuardianTimeout(t *testing.T) { if execTime := time.Now().Sub(tStart); execTime < mustExecDur || execTime > mustExecDur+time.Duration(20*time.Millisecond) { t.Errorf("Execution took: %v", execTime) } - Guardian.RLock() + Guardian.Lock() for _, key := range keys { if _, hasKey := Guardian.locksMap[key]; hasKey { t.Error("Possible memleak") } } - Guardian.RUnlock() + Guardian.Unlock() } func TestGuardianGuardIDs(t *testing.T) { @@ -91,27 +90,27 @@ func TestGuardianGuardIDs(t *testing.T) { //lock with 3 keys lockIDs := []string{"test1", "test2", "test3"} // make sure the keys are not in guardian before lock - Guardian.RLock() + Guardian.Lock() for _, lockID := range lockIDs { if _, hasKey := Guardian.locksMap[lockID]; hasKey { t.Errorf("Unexpected lockID found: %s", lockID) } } - Guardian.RUnlock() + Guardian.Unlock() // lock 3 items tStart := time.Now() lockDur := 2 * time.Millisecond Guardian.GuardIDs(lockDur, lockIDs...) - Guardian.RLock() + Guardian.Lock() for _, lockID := range lockIDs { if itmLock, hasKey := Guardian.locksMap[lockID]; !hasKey { t.Errorf("Cannot find lock for lockID: %s", lockID) - } else if atomic.LoadInt64(&itmLock.cnt) != 1 { + } else if itmLock.cnt != 1 { t.Errorf("Unexpected itmLock found: %+v", itmLock) } } - Guardian.RUnlock() + Guardian.Unlock() secLockDur := time.Duration(1 * time.Millisecond) // second lock to test counter @@ -119,29 +118,29 @@ func TestGuardianGuardIDs(t *testing.T) { time.Sleep(20 * time.Microsecond) // give time for goroutine to lock // check if counters were properly increased - Guardian.RLock() + Guardian.Lock() lkID := lockIDs[0] eCnt := int64(1) if itmLock, hasKey := Guardian.locksMap[lkID]; !hasKey { t.Errorf("Cannot find lock for lockID: %s", lkID) - } else if cnt := atomic.LoadInt64(&itmLock.cnt); cnt != eCnt { - t.Errorf("Unexpected counter: %d for itmLock with id %s", cnt, lkID) + } else if itmLock.cnt != eCnt { + t.Errorf("Unexpected counter: %d for itmLock with id %s", itmLock.cnt, lkID) } lkID = lockIDs[1] eCnt = int64(2) if itmLock, hasKey := Guardian.locksMap[lkID]; !hasKey { t.Errorf("Cannot find lock for lockID: %s", lkID) - } else if cnt := atomic.LoadInt64(&itmLock.cnt); cnt != eCnt { - t.Errorf("Unexpected counter: %d for itmLock with id %s", cnt, lkID) + } else if itmLock.cnt != eCnt { + t.Errorf("Unexpected counter: %d for itmLock with id %s", itmLock.cnt, lkID) } lkID = lockIDs[2] eCnt = int64(1) // we did not manage to increase it yet since it did not pass first lock if itmLock, hasKey := Guardian.locksMap[lkID]; !hasKey { t.Errorf("Cannot find lock for lockID: %s", lkID) - } else if cnt := atomic.LoadInt64(&itmLock.cnt); cnt != eCnt { - t.Errorf("Unexpected counter: %d for itmLock with id %s", cnt, lkID) + } else if itmLock.cnt != eCnt { + t.Errorf("Unexpected counter: %d for itmLock with id %s", itmLock.cnt, lkID) } - Guardian.RUnlock() + Guardian.Unlock() time.Sleep(lockDur + secLockDur + time.Millisecond) // give time to unlock before proceeding @@ -160,28 +159,28 @@ func TestGuardianGuardIDs(t *testing.T) { time.Sleep(time.Duration(30) * time.Millisecond) // making sure the items stay locked - Guardian.RLock() + Guardian.Lock() if len(Guardian.locksMap) != 3 { t.Errorf("locksMap should be have 3 elements, have: %+v", Guardian.locksMap) } for _, lkID := range lockIDs { if itmLock, hasKey := Guardian.locksMap[lkID]; !hasKey { t.Errorf("Cannot find lock for lockID: %s", lkID) - } else if cnt := atomic.LoadInt64(&itmLock.cnt); cnt != 1 { - t.Errorf("Unexpected counter: %d for itmLock with id %s", cnt, lkID) + } else if itmLock.cnt != 1 { + t.Errorf("Unexpected counter: %d for itmLock with id %s", itmLock.cnt, lkID) } } - Guardian.RUnlock() + Guardian.Unlock() Guardian.UnguardIDs(lockIDs...) time.Sleep(time.Duration(50) * time.Millisecond) // make sure items were unlocked - Guardian.RLock() + Guardian.Lock() if len(Guardian.locksMap) != 0 { t.Errorf("locksMap should have 0 elements, has: %+v", Guardian.locksMap) } - Guardian.RUnlock() + Guardian.Unlock() } func BenchmarkGuard(b *testing.B) {