From f9b068fba6b059bee04094d04ff5e0c8e60d64d7 Mon Sep 17 00:00:00 2001 From: DanB Date: Sat, 27 Aug 2016 09:32:36 +0200 Subject: [PATCH] Memory leak fix, improved engine lockig through Mutexes, tests attached --- engine/guardian.go | 89 +++++++++++++++++++++++++++-------------- engine/guardian_test.go | 63 ++++++++++++++++++++++++++++- utils/map_test.go | 18 +++++++++ 3 files changed, 137 insertions(+), 33 deletions(-) diff --git a/engine/guardian.go b/engine/guardian.go index 6d6664e39..6288ef68b 100644 --- a/engine/guardian.go +++ b/engine/guardian.go @@ -1,6 +1,6 @@ /* -Rating system designed to be used in VoIP Carriers World -Copyright (C) 2012-2015 ITsysCOM +Real-time Charging System for Telecom & ISP environments +Copyright (C) ITsysCOM GmbH This program is free software: you can redistribute it and/or modify it under the terms of the GNU General Public License as published by @@ -24,51 +24,78 @@ import ( ) // global package variable -var Guardian = &GuardianLock{locksMap: make(map[string]chan bool)} +var Guardian = &GuardianLock{locksMap: make(map[string]*itemLock)} -type GuardianLock struct { - locksMap map[string]chan bool - mu sync.RWMutex +func newItemLock(keyID string) *itemLock { + return &itemLock{keyID: keyID, lk: new(sync.Mutex)} } -func (cm *GuardianLock) Guard(handler func() (interface{}, error), timeout time.Duration, names ...string) (reply interface{}, err error) { - var locks []chan bool // take existing locks out of the mutex - cm.mu.Lock() +// itemLock represents one lock with key autodestroy +type itemLock struct { + keyID string // store it so we know what to destroy + lk *sync.Mutex + cnt int +} + +// lock() keeps also record of running jobs on same item +func (il *itemLock) lock() { + il.lk.Lock() + il.cnt++ +} + +// unlock() executes combined lock with autoremoving lock from Guardian +func (il *itemLock) unlock() { + il.cnt-- + if il.cnt == 0 { + Guardian.Lock() + delete(Guardian.locksMap, il.keyID) + Guardian.Unlock() + } + il.lk.Unlock() +} + +// GuardianLock is an optimized locking system per locking key +type GuardianLock struct { + locksMap map[string]*itemLock + sync.Mutex +} + +func (guard *GuardianLock) Guard(handler func() (interface{}, error), timeout time.Duration, names ...string) (reply interface{}, err error) { + var itmLocks []*itemLock // will need to lock all of them before proceeding with our task + guard.Lock() for _, name := range names { - if lock, exists := Guardian.locksMap[name]; !exists { - lock = make(chan bool, 1) - Guardian.locksMap[name] = lock - lock <- true - } else { - locks = append(locks, lock) + var itmLock *itemLock + itmLock, exists := Guardian.locksMap[name] + if !exists { + itmLock = newItemLock(name) + Guardian.locksMap[name] = itmLock } + itmLocks = append(itmLocks, itmLock) } - cm.mu.Unlock() + guard.Unlock() - for _, lock := range locks { - lock <- true + for _, itmLock := range itmLocks { + itmLock.lock() } - funcWaiter := make(chan bool) - go func() { + handlerDone := make(chan struct{}) + go func(chan struct{}) { // execute reply, err = handler() - funcWaiter <- true - }() - // wait with timeout - if timeout > 0 { + handlerDone <- struct{}{} + }(handlerDone) + + if timeout > 0 { // wait with timeout select { - case <-funcWaiter: + case <-handlerDone: case <-time.After(timeout): } - } else { - <-funcWaiter + } else { // a bit dangerous but wait till handler finishes + <-handlerDone } // release - cm.mu.RLock() - for _, name := range names { - <-Guardian.locksMap[name] + for _, itmLock := range itmLocks { + itmLock.unlock() } - cm.mu.RUnlock() return } diff --git a/engine/guardian_test.go b/engine/guardian_test.go index 6e436e9d5..968fa66f1 100644 --- a/engine/guardian_test.go +++ b/engine/guardian_test.go @@ -1,6 +1,6 @@ /* -Rating system designed to be used in VoIP Carriers World -Copyright (C) 2012-2015 ITsysCOM +Real-time Charging System for Telecom & ISP environments +Copyright (C) ITsysCOM GmbH This program is free software: you can redistribute it and/or modify it under the terms of the GNU General Public License as published by @@ -19,10 +19,69 @@ along with this program. If not, see package engine import ( + "sync" "testing" "time" ) +func delayHandler() (interface{}, error) { + time.Sleep(100 * time.Millisecond) + return nil, nil +} + +// Forks 3 groups of workers and makes sure that the time for execution is the one we expect for all 15 goroutines (with 100ms ) +func TestGuardianMultipleKeys(t *testing.T) { + tStart := time.Now() + maxIter := 5 + sg := new(sync.WaitGroup) + keys := []string{"test1", "test2", "test3"} + for i := 0; i < maxIter; i++ { + for _, key := range keys { + sg.Add(1) + go func(key string) { + Guardian.Guard(delayHandler, 0, key) + sg.Done() + }(key) + } + } + sg.Wait() + mustExecDur := time.Duration(maxIter*100) * time.Millisecond + if execTime := time.Now().Sub(tStart); execTime < mustExecDur || execTime > mustExecDur+time.Duration(10*time.Millisecond) { + t.Errorf("Execution took: %v", execTime) + } + for _, key := range keys { + if _, hasKey := Guardian.locksMap[key]; hasKey { + t.Error("Possible memleak") + } + } +} + +func TestGuardianTimeout(t *testing.T) { + tStart := time.Now() + maxIter := 5 + sg := new(sync.WaitGroup) + keys := []string{"test1", "test2", "test3"} + for i := 0; i < maxIter; i++ { + for _, key := range keys { + sg.Add(1) + go func(key string) { + Guardian.Guard(delayHandler, time.Duration(10*time.Millisecond), key) + sg.Done() + }(key) + } + } + sg.Wait() + mustExecDur := time.Duration(maxIter*10) * time.Millisecond + if execTime := time.Now().Sub(tStart); execTime < mustExecDur || execTime > mustExecDur+time.Duration(10*time.Millisecond) { + t.Errorf("Execution took: %v", execTime) + } + for _, key := range keys { + if _, hasKey := Guardian.locksMap[key]; hasKey { + t.Error("Possible memleak") + } + } +} + func BenchmarkGuard(b *testing.B) { for i := 0; i < 100; i++ { go Guardian.Guard(func() (interface{}, error) { diff --git a/utils/map_test.go b/utils/map_test.go index b9076f88b..d95227ec6 100644 --- a/utils/map_test.go +++ b/utils/map_test.go @@ -1,3 +1,21 @@ +/* +Real-time Charging System for Telecom & ISP environments +Copyright (C) ITsysCOM GmbH + +This program is free software: you can redistribute it and/or modify +it under the terms of the GNU General Public License as published by +the Free Software Foundation, either version 3 of the License, or +(at your option) any later version. + +This program is distributed in the hope that it will be useful, +but WITHOUT ANY WARRANTY; without even the implied warranty of +MERCHANTABILITY or FITNESS FOR A PARTICULAR PURPOSE. See the +GNU General Public License for more details. + +You should have received a copy of the GNU General Public License +along with this program. If not, see +*/ + package utils import (