improved guardian

This commit is contained in:
Radu Ioan Fericean
2015-12-14 16:53:51 +02:00
parent 1cc6955bfa
commit ec003006b0
3 changed files with 40 additions and 30 deletions

View File

@@ -67,7 +67,7 @@ func fsCdrHandler(w http.ResponseWriter, r *http.Request) {
}
func NewCdrServer(cgrCfg *config.CGRConfig, cdrDb CdrStorage, rater Connector, pubsub PublisherSubscriber, users UserService, aliases AliasService, stats StatsInterface) (*CdrServer, error) {
return &CdrServer{cgrCfg: cgrCfg, cdrDb: cdrDb, rater: rater, pubsub: pubsub, users: users, aliases: aliases, stats: stats, guard: &GuardianLock{queue: make(map[string]chan bool)}}, nil
return &CdrServer{cgrCfg: cgrCfg, cdrDb: cdrDb, rater: rater, pubsub: pubsub, users: users, aliases: aliases, stats: stats, guard: &GuardianLock{locksMap: make(map[string]chan bool)}}, nil
}
type CdrServer struct {

View File

@@ -24,28 +24,35 @@ import (
)
// global package variable
var Guardian = &GuardianLock{queue: make(map[string]chan bool)}
var Guardian = &GuardianLock{locksMap: make(map[string]chan bool)}
func NewGuardianLock() *GuardianLock {
return &GuardianLock{queue: make(map[string]chan bool)}
return &GuardianLock{locksMap: make(map[string]chan bool)}
}
type GuardianLock struct {
queue map[string]chan bool
mu sync.Mutex
locksMap map[string]chan bool
mu sync.RWMutex
}
func (cm *GuardianLock) Guard(handler func() (interface{}, error), timeout time.Duration, names ...string) (reply interface{}, err error) {
var locks []chan bool // take existing locks out of the mutex
cm.mu.Lock()
for _, name := range names {
lock, exists := Guardian.queue[name]
if !exists {
if lock, exists := Guardian.locksMap[name]; !exists {
lock = make(chan bool, 1)
Guardian.queue[name] = lock
Guardian.locksMap[name] = lock
lock <- true
} else {
locks = append(locks, lock)
}
lock <- true
}
cm.mu.Unlock()
for _, lock := range locks {
lock <- true
}
funcWaiter := make(chan bool)
go func() {
// execute
@@ -62,9 +69,10 @@ func (cm *GuardianLock) Guard(handler func() (interface{}, error), timeout time.
<-funcWaiter
}
// release
cm.mu.RLock()
for _, name := range names {
lock := Guardian.queue[name]
<-lock
<-Guardian.locksMap[name]
}
cm.mu.RUnlock()
return
}

View File

@@ -25,23 +25,25 @@ import (
)
func ATestAccountLock(t *testing.T) {
go Guardian.Guard(func() (interface{}, error) {
log.Print("first 1")
time.Sleep(1 * time.Second)
log.Print("end first 1")
return 0, nil
}, 0, "1")
go Guardian.Guard(func() (interface{}, error) {
log.Print("first 2")
time.Sleep(1 * time.Second)
log.Print("end first 2")
return 0, nil
}, 0, "2")
go Guardian.Guard(func() (interface{}, error) {
log.Print("second 1")
time.Sleep(1 * time.Second)
log.Print("end second 1")
return 0, nil
}, 0, "1")
time.Sleep(3 * time.Second)
for i := 0; i < 100; i++ {
go Guardian.Guard(func() (interface{}, error) {
log.Print("first 1")
time.Sleep(1 * time.Millisecond)
log.Print("end first 1")
return 0, nil
}, 0, "1")
go Guardian.Guard(func() (interface{}, error) {
log.Print("first 2")
time.Sleep(1 * time.Millisecond)
log.Print("end first 2")
return 0, nil
}, 0, "2")
go Guardian.Guard(func() (interface{}, error) {
log.Print("second 1")
time.Sleep(1 * time.Millisecond)
log.Print("end second 1")
return 0, nil
}, 0, "1")
}
time.Sleep(10 * time.Second)
}