diff --git a/guardian/guardian.go b/guardian/guardian.go index 244d8bae0..fb354df82 100644 --- a/guardian/guardian.go +++ b/guardian/guardian.go @@ -32,7 +32,7 @@ var Guardian = &GuardianLocker{ refs: make(map[string][]string)} type itemLock struct { - lk chan struct{} + lk chan struct{} //better with mutex cnt int64 } @@ -78,7 +78,7 @@ func (gl *GuardianLocker) unlockItem(itmID string) { delete(gl.locks, itmID) } gl.lkMux.Unlock() - itmLock.lk <- struct{}{} + itmLock.lk <- struct{}{} //the unlock should be above the gl.Lock } // lockWithReference will perform locks and also generate a lock reference for it (so it can be used when remotely locking) @@ -111,7 +111,8 @@ func (gl *GuardianLocker) lockWithReference(refID string, lkIDs []string) string func (gl *GuardianLocker) unlockWithReference(refID string) (lkIDs []string) { gl.lockItem(refID) gl.refsMux.Lock() - lkIDs, has := gl.refs[refID] + lkIDs, has := gl.refs[refID] // this value is local and not sent back + if !has { gl.refsMux.Unlock() gl.unlockItem(refID) @@ -127,11 +128,11 @@ func (gl *GuardianLocker) unlockWithReference(refID string) (lkIDs []string) { } // Guard executes the handler between locks -func (gl *GuardianLocker) Guard(handler func() (interface{}, error), timeout time.Duration, lockIDs ...string) (reply interface{}, err error) { +func (gl *GuardianLocker) Guard(handler func() (interface{}, error), timeout time.Duration, lockIDs ...string) (reply interface{}, err error) { // do we need the interface here as a reply? for _, lockID := range lockIDs { gl.lockItem(lockID) } - rplyChan := make(chan interface{}) + rplyChan := make(chan interface{}) // make them buffered in order to not have a gorutine sitting on just because there is nobody to read from them errChan := make(chan error) go func(rplyChan chan interface{}, errChan chan error) { // execute @@ -157,6 +158,7 @@ func (gl *GuardianLocker) Guard(handler func() (interface{}, error), timeout tim for _, lockID := range lockIDs { gl.unlockItem(lockID) } + // consider closing the return chanels if there is no timout return } @@ -164,7 +166,7 @@ func (gl *GuardianLocker) Guard(handler func() (interface{}, error), timeout tim // returns the reference ID for the lock group aquired func (gl *GuardianLocker) GuardIDs(refID string, timeout time.Duration, lkIDs ...string) (retRefID string) { retRefID = gl.lockWithReference(refID, lkIDs) - if timeout != 0 && retRefID != "" { + if timeout != 0 && retRefID != "" { // we should consider using time.AfterFunc and store the timer go func() { time.Sleep(timeout) lkIDs := gl.unlockWithReference(retRefID) diff --git a/guardian/guardian2.go b/guardian/guardian2.go new file mode 100644 index 000000000..4752d1888 --- /dev/null +++ b/guardian/guardian2.go @@ -0,0 +1,180 @@ +/* +Real-time Online/Offline Charging System (OCS) for Telecom & ISP environments +Copyright (C) ITsysCOM GmbH + +This program is free software: you can redistribute it and/or modify +it under the terms of the GNU General Public License as published by +the Free Software Foundation, either version 3 of the License, or +(at your option) any later version. + +This program is distributed in the hope that it will be useful, +but WITHOUT ANY WARRANTY; without even the implied warranty of +MERCHANTABILITY or FITNESS FOR A PARTICULAR PURPOSE. See the +GNU General Public License for more details. + +You should have received a copy of the GNU General Public License +along with this program. If not, see +*/ + +package guardian + +import ( + "fmt" + "sync" + "time" + + "github.com/cgrates/cgrates/utils" +) + +// Guardian is the global package variable +var Guardian2 = &GuardianLocker2{ + locks: make(map[string]*itemLock2), + refs: make(map[string]*refObj)} + +type itemLock2 struct { + sync.Mutex + cnt int64 +} + +type refObj struct { + refs []string + tm *time.Timer +} + +// GuardianLocker2 is an optimized locking system per locking key +type GuardianLocker2 struct { + locks map[string]*itemLock2 + lkMux sync.Mutex // protects the locks + refs map[string]*refObj // used in case of remote locks + refsMux sync.RWMutex // protects the map +} + +func (gl *GuardianLocker2) lockItem(itmID string) { + if itmID == "" { + return + } + gl.lkMux.Lock() + itmLock, exists := gl.locks[itmID] + if !exists { + itmLock = new(itemLock2) + gl.locks[itmID] = itmLock + } + itmLock.cnt++ + gl.lkMux.Unlock() + itmLock.Lock() +} + +func (gl *GuardianLocker2) unlockItem(itmID string) { + gl.lkMux.Lock() + itmLock, exists := gl.locks[itmID] + if !exists { + gl.lkMux.Unlock() + return + } + itmLock.cnt-- + if itmLock.cnt == 0 { + delete(gl.locks, itmID) + } + itmLock.Unlock() + gl.lkMux.Unlock() +} + +// lockWithReference will perform locks and also generate a lock reference for it (so it can be used when remotely locking) +func (gl *GuardianLocker2) lockWithReference(refID string, timeout time.Duration, lkIDs ...string) string { + var refEmpty bool + if refID == "" { + refEmpty = true + refID = utils.GenUUID() + } + gl.lockItem(refID) // make sure we only process one simultaneous refID at the time, otherwise checking already used refID is not reliable + gl.refsMux.Lock() + if !refEmpty { + if _, has := gl.refs[refID]; has { + gl.refsMux.Unlock() + gl.unlockItem(refID) + return "" // no locking was done + } + } + var tm *time.Timer + if timeout != 0 { + tm = time.AfterFunc(timeout, func() { + if lkIDs := gl.unlockWithReference(refID); len(lkIDs) != 0 { + utils.Logger.Warning(fmt.Sprintf(" force timing-out locks: %+v", lkIDs)) + } + }) + } + gl.refs[refID] = &refObj{ + refs: lkIDs, + tm: tm, + } + gl.refsMux.Unlock() + // execute the real locks + for _, lk := range lkIDs { + gl.lockItem(lk) + } + gl.unlockItem(refID) + return refID +} + +// unlockWithReference will unlock based on the reference ID +func (gl *GuardianLocker2) unlockWithReference(refID string) (lkIDs []string) { + gl.lockItem(refID) + gl.refsMux.Lock() + ref, has := gl.refs[refID] + if !has { + gl.refsMux.Unlock() + gl.unlockItem(refID) + return + } + if ref.tm != nil { + ref.tm.Stop() + } + delete(gl.refs, refID) + gl.refsMux.Unlock() + lkIDs = ref.refs + for _, lk := range lkIDs { + gl.unlockItem(lk) + } + gl.unlockItem(refID) + return +} + +// Guard executes the handler between locks +func (gl *GuardianLocker2) Guard(handler func() error, timeout time.Duration, lockIDs ...string) (err error) { // do we need the interface here as a reply? + for _, lockID := range lockIDs { + gl.lockItem(lockID) + } + errChan := make(chan error, 1) + go func() { + errChan <- handler() + }() + if timeout > 0 { // wait with timeout + select { + case err = <-errChan: + close(errChan) + case <-time.After(timeout): + utils.Logger.Warning(fmt.Sprintf(" force timing-out locks: %+v", lockIDs)) + } + } else { // a bit dangerous but wait till handler finishes + err = <-errChan + close(errChan) + } + for _, lockID := range lockIDs { + gl.unlockItem(lockID) + } + return +} + +// GuardIDs aquires a lock for duration +// returns the reference ID for the lock group aquired +func (gl *GuardianLocker2) GuardIDs(refID string, timeout time.Duration, lkIDs ...string) string { + return gl.lockWithReference(refID, timeout, lkIDs...) +} + +// UnguardIDs attempts to unlock a set of locks based on their reference ID received on lock +func (gl *GuardianLocker2) UnguardIDs(refID string) (_ []string) { + if refID == "" { + return + } + return gl.unlockWithReference(refID) +} diff --git a/guardian/guardian2_test.go b/guardian/guardian2_test.go new file mode 100644 index 000000000..842238943 --- /dev/null +++ b/guardian/guardian2_test.go @@ -0,0 +1,346 @@ +/* +Real-time Online/Offline Charging System (OCS) for Telecom & ISP environments +Copyright (C) ITsysCOM GmbH + +This program is free software: you can redistribute it and/or modify +it under the terms of the GNU General Public License as published by +the Free Software Foundation, either version 3 of the License, or +(at your option) any later version. + +This program is distributed in the hope that it will be useful, +but WITHOUT ANY WARRANTY; without even the implied warranty of +MERCHANTABILITY or FITNESS FOR A PARTICULAR PURPOSE. See the +GNU General Public License for more details. + +You should have received a copy of the GNU General Public License +along with this program. If not, see +*/ +package guardian + +import ( + "reflect" + "strconv" + "sync" + "testing" + "time" + + "github.com/cgrates/cgrates/utils" +) + +func delayHandler2() error { + time.Sleep(100 * time.Millisecond) + return nil +} + +// Forks 3 groups of workers and makes sure that the time for execution is the one we expect for all 15 goroutines (with 100ms ) +func TestGuardian2MultipleKeys(t *testing.T) { + tStart := time.Now() + maxIter := 5 + sg := new(sync.WaitGroup) + keys := []string{"test1", "test2", "test3"} + for i := 0; i < maxIter; i++ { + for _, key := range keys { + sg.Add(1) + go func(key string) { + Guardian2.Guard(delayHandler2, 0, key) + sg.Done() + }(key) + } + } + sg.Wait() + mustExecDur := time.Duration(maxIter*100) * time.Millisecond + if execTime := time.Now().Sub(tStart); execTime < mustExecDur || + execTime > mustExecDur+100*time.Millisecond { + t.Errorf("Execution took: %v", execTime) + } + Guardian2.lkMux.Lock() + for _, key := range keys { + if _, hasKey := Guardian2.locks[key]; hasKey { + t.Errorf("Possible memleak for key: %s", key) + } + } + Guardian2.lkMux.Unlock() +} + +func TestGuardian2Timeout(t *testing.T) { + tStart := time.Now() + maxIter := 5 + sg := new(sync.WaitGroup) + keys := []string{"test1", "test2", "test3"} + for i := 0; i < maxIter; i++ { + for _, key := range keys { + sg.Add(1) + go func(key string) { + Guardian2.Guard(delayHandler2, 10*time.Millisecond, key) + sg.Done() + }(key) + } + } + sg.Wait() + mustExecDur := time.Duration(maxIter*10) * time.Millisecond + if execTime := time.Now().Sub(tStart); execTime < mustExecDur || + execTime > mustExecDur+100*time.Millisecond { + t.Errorf("Execution took: %v", execTime) + } + Guardian2.lkMux.Lock() + for _, key := range keys { + if _, hasKey := Guardian2.locks[key]; hasKey { + t.Error("Possible memleak") + } + } + Guardian2.lkMux.Unlock() +} + +func TestGuardian2GuardIDs(t *testing.T) { + + //lock with 3 keys + lockIDs := []string{"test1", "test2", "test3"} + // make sure the keys are not in guardian before lock + Guardian2.lkMux.Lock() + for _, lockID := range lockIDs { + if _, hasKey := Guardian2.locks[lockID]; hasKey { + t.Errorf("Unexpected lockID found: %s", lockID) + } + } + Guardian2.lkMux.Unlock() + // lock 3 items + tStart := time.Now() + lockDur := 2 * time.Millisecond + Guardian2.GuardIDs("", lockDur, lockIDs...) + Guardian2.lkMux.Lock() + for _, lockID := range lockIDs { + if itmLock, hasKey := Guardian2.locks[lockID]; !hasKey { + t.Errorf("Cannot find lock for lockID: %s", lockID) + } else if itmLock.cnt != 1 { + t.Errorf("Unexpected itmLock found: %+v", itmLock) + } + } + Guardian2.lkMux.Unlock() + secLockDur := time.Millisecond + // second lock to test counter + go Guardian2.GuardIDs("", secLockDur, lockIDs[1:]...) + time.Sleep(30 * time.Microsecond) // give time for goroutine to lock + // check if counters were properly increased + Guardian2.lkMux.Lock() + lkID := lockIDs[0] + eCnt := int64(1) + if itmLock, hasKey := Guardian2.locks[lkID]; !hasKey { + t.Errorf("Cannot find lock for lockID: %s", lkID) + } else if itmLock.cnt != eCnt { + t.Errorf("Unexpected counter: %d for itmLock with id %s", itmLock.cnt, lkID) + } + lkID = lockIDs[1] + eCnt = int64(2) + if itmLock, hasKey := Guardian2.locks[lkID]; !hasKey { + t.Errorf("Cannot find lock for lockID: %s", lkID) + } else if itmLock.cnt != eCnt { + t.Errorf("Unexpected counter: %d for itmLock with id %s", itmLock.cnt, lkID) + } + lkID = lockIDs[2] + eCnt = int64(1) // we did not manage to increase it yet since it did not pass first lock + if itmLock, hasKey := Guardian2.locks[lkID]; !hasKey { + t.Errorf("Cannot find lock for lockID: %s", lkID) + } else if itmLock.cnt != eCnt { + t.Errorf("Unexpected counter: %d for itmLock with id %s", itmLock.cnt, lkID) + } + Guardian2.lkMux.Unlock() + time.Sleep(lockDur + secLockDur + 50*time.Millisecond) // give time to unlock before proceeding + + // make sure all counters were removed + for _, lockID := range lockIDs { + if _, hasKey := Guardian2.locks[lockID]; hasKey { + t.Errorf("Unexpected lockID found: %s", lockID) + } + } + // test lock without timer + refID := Guardian2.GuardIDs("", 0, lockIDs...) + + if totalLockDur := time.Now().Sub(tStart); totalLockDur < lockDur { + t.Errorf("Lock duration too small") + } + time.Sleep(30 * time.Millisecond) + // making sure the items stay locked + Guardian2.lkMux.Lock() + if len(Guardian2.locks) != 3 { + t.Errorf("locks should have 3 elements, have: %+v", Guardian2.locks) + } + for _, lkID := range lockIDs { + if itmLock, hasKey := Guardian2.locks[lkID]; !hasKey { + t.Errorf("Cannot find lock for lockID: %s", lkID) + } else if itmLock.cnt != 1 { + t.Errorf("Unexpected counter: %d for itmLock with id %s", itmLock.cnt, lkID) + } + } + Guardian2.lkMux.Unlock() + Guardian2.UnguardIDs(refID) + // make sure items were unlocked + Guardian2.lkMux.Lock() + if len(Guardian2.locks) != 0 { + t.Errorf("locks should have 0 elements, has: %+v", Guardian2.locks) + } + Guardian2.lkMux.Unlock() +} + +// TestGuardian2GuardIDsConcurrent executes GuardIDs concurrently +func TestGuardian2GuardIDsConcurrent(t *testing.T) { + maxIter := 500 + sg := new(sync.WaitGroup) + keys := []string{"test1", "test2", "test3"} + refID := utils.GenUUID() + for i := 0; i < maxIter; i++ { + sg.Add(1) + go func() { + if retRefID := Guardian2.GuardIDs(refID, 0, keys...); retRefID != "" { + if lkIDs := Guardian2.UnguardIDs(refID); !reflect.DeepEqual(keys, lkIDs) { + t.Errorf("expecting: %+v, received: %+v", keys, lkIDs) + } + } + sg.Done() + }() + } + sg.Wait() + + Guardian2.lkMux.Lock() + if len(Guardian2.locks) != 0 { + t.Errorf("Possible memleak for locks: %+v", Guardian2.locks) + } + Guardian2.lkMux.Unlock() + Guardian2.refsMux.Lock() + if len(Guardian2.refs) != 0 { + t.Errorf("Possible memleak for refs: %+v", Guardian2.refs) + } + Guardian2.refsMux.Unlock() +} + +func TestGuardian2GuardIDsTimeoutConcurrent(t *testing.T) { + maxIter := 50 + sg := new(sync.WaitGroup) + keys := []string{"test1", "test2", "test3"} + refID := utils.GenUUID() + for i := 0; i < maxIter; i++ { + sg.Add(1) + go func() { + Guardian2.GuardIDs(refID, time.Microsecond, keys...) + sg.Done() + }() + } + sg.Wait() + time.Sleep(10 * time.Millisecond) + Guardian2.lkMux.Lock() + if len(Guardian2.locks) != 0 { + t.Errorf("Possible memleak for locks: %+v", Guardian2.locks) + } + Guardian2.lkMux.Unlock() + Guardian2.refsMux.Lock() + if len(Guardian2.refs) != 0 { + t.Errorf("Possible memleak for refs: %+v", Guardian2.refs) + } + Guardian2.refsMux.Unlock() +} + +// BenchmarkGuard-8 200000 13759 ns/op +func BenchmarkGuard2(b *testing.B) { + wg := new(sync.WaitGroup) + wg.Add(b.N * 3) + b.ResetTimer() + for n := 0; n < b.N; n++ { + go func() { + Guardian2.Guard(func() error { + time.Sleep(time.Microsecond) + return nil + }, 0, "1") + wg.Done() + }() + go func() { + Guardian2.Guard(func() error { + time.Sleep(time.Microsecond) + return nil + }, 0, "2") + wg.Done() + }() + go func() { + Guardian2.Guard(func() error { + time.Sleep(time.Microsecond) + return nil + }, 0, "1") + wg.Done() + }() + } + wg.Wait() +} + +// BenchmarkGuardian-8 1000000 5794 ns/op +func BenchmarkGuardian2(b *testing.B) { + wg := new(sync.WaitGroup) + wg.Add(b.N) + b.ResetTimer() + for n := 0; n < b.N; n++ { + go func(n int) { + Guardian2.Guard(func() error { + time.Sleep(time.Microsecond) + return nil + }, 0, strconv.Itoa(n)) + wg.Done() + }(n) + } + wg.Wait() +} + +// BenchmarkGuardIDs-8 1000000 8732 ns/op +func BenchmarkGuardIDs2(b *testing.B) { + wg := new(sync.WaitGroup) + wg.Add(b.N) + b.ResetTimer() + for n := 0; n < b.N; n++ { + go func(i int) { + if refID := Guardian2.GuardIDs("", 0, strconv.Itoa(i)); refID != "" { + time.Sleep(time.Microsecond) + Guardian2.UnguardIDs(refID) + } + wg.Done() + }(n) + } + wg.Wait() +} + +func TestGuardian2LockItemUnlockItem(t *testing.T) { + //for coverage purposes + itemID := utils.EmptyString + Guardian2.lockItem(itemID) + Guardian2.unlockItem(itemID) + if itemID != utils.EmptyString { + t.Errorf("\nExpected <%+v>, \nReceived <%+v>", utils.EmptyString, itemID) + } +} + +func TestGuardian2LockUnlockWithReference(t *testing.T) { + //for coverage purposes + refID := utils.EmptyString + Guardian2.lockWithReference(refID, 0) + Guardian2.unlockWithReference(refID) + if refID != utils.EmptyString { + t.Errorf("\nExpected <%+v>, \nReceived <%+v>", utils.EmptyString, refID) + } +} + +func TestGuardian2GuardUnguardIDs(t *testing.T) { + //for coverage purposes + refID := utils.EmptyString + lkIDs := []string{"test1", "test2", "test3"} + Guardian2.GuardIDs(refID, time.Second, lkIDs...) + Guardian2.UnguardIDs(refID) + if refID != utils.EmptyString { + t.Errorf("\nExpected <%+v>, \nReceived <%+v>", utils.EmptyString, refID) + } +} + +func TestGuardian2GuardUnguardIDsCase2(t *testing.T) { + //for coverage purposes + lkIDs := []string{"test1", "test2", "test3"} + err := Guardian2.Guard(func() error { + return utils.ErrNotFound + }, 10*time.Millisecond, lkIDs...) + if err == nil || err != utils.ErrNotFound { + t.Errorf("\nExpected <%+v>, \nReceived <%+v>", utils.ErrNotFound, err) + } +} diff --git a/guardian/guardian_test.go b/guardian/guardian_test.go index dd576e116..84d76ae17 100644 --- a/guardian/guardian_test.go +++ b/guardian/guardian_test.go @@ -240,43 +240,68 @@ func TestGuardianGuardIDsTimeoutConcurrent(t *testing.T) { // BenchmarkGuard-8 200000 13759 ns/op func BenchmarkGuard(b *testing.B) { + wg := new(sync.WaitGroup) + wg.Add(b.N * 3) + b.ResetTimer() for n := 0; n < b.N; n++ { - go Guardian.Guard(func() (interface{}, error) { - time.Sleep(time.Microsecond) - return 0, nil - }, 0, "1") - go Guardian.Guard(func() (interface{}, error) { - time.Sleep(time.Microsecond) - return 0, nil - }, 0, "2") - go Guardian.Guard(func() (interface{}, error) { - time.Sleep(time.Microsecond) - return 0, nil - }, 0, "1") + go func() { + Guardian.Guard(func() (interface{}, error) { + time.Sleep(time.Microsecond) + return 0, nil + }, 0, "1") + wg.Done() + }() + go func() { + Guardian.Guard(func() (interface{}, error) { + time.Sleep(time.Microsecond) + return 0, nil + }, 0, "2") + wg.Done() + }() + go func() { + Guardian.Guard(func() (interface{}, error) { + time.Sleep(time.Microsecond) + return 0, nil + }, 0, "1") + wg.Done() + }() } + wg.Wait() } // BenchmarkGuardian-8 1000000 5794 ns/op func BenchmarkGuardian(b *testing.B) { + wg := new(sync.WaitGroup) + wg.Add(b.N) + b.ResetTimer() for n := 0; n < b.N; n++ { - go Guardian.Guard(func() (interface{}, error) { - time.Sleep(time.Microsecond) - return 0, nil - }, 0, strconv.Itoa(n)) + go func(n int) { + Guardian.Guard(func() (interface{}, error) { + time.Sleep(time.Microsecond) + return 0, nil + }, 0, strconv.Itoa(n)) + wg.Done() + }(n) } + wg.Wait() } // BenchmarkGuardIDs-8 1000000 8732 ns/op func BenchmarkGuardIDs(b *testing.B) { + wg := new(sync.WaitGroup) + wg.Add(b.N) + b.ResetTimer() for n := 0; n < b.N; n++ { go func(i int) { if refID := Guardian.GuardIDs("", 0, strconv.Itoa(i)); refID != "" { time.Sleep(time.Microsecond) Guardian.UnguardIDs(refID) } + wg.Done() }(n) } + wg.Wait() } func TestGuardianLockItemUnlockItem(t *testing.T) {