From b32e441505da2570c6ef5739340f07154475b926 Mon Sep 17 00:00:00 2001 From: Radu Ioan Fericean Date: Tue, 23 Jun 2015 17:46:09 +0300 Subject: [PATCH] better stats shutdown --- engine/responder.go | 5 ++++- engine/stats.go | 48 +++++++++++++++++++++++++++++++++++---------- 2 files changed, 42 insertions(+), 11 deletions(-) diff --git a/engine/responder.go b/engine/responder.go index 4aadfaec8..5f8411b4f 100644 --- a/engine/responder.go +++ b/engine/responder.go @@ -285,7 +285,10 @@ func (rs *Responder) Shutdown(arg string, reply *string) (err error) { if rs.Bal != nil { rs.Bal.Shutdown("Responder.Shutdown") } - ratingStorage.(Storage).Close() + ratingStorage.Close() + accountingStorage.Close() + storageLogger.Close() + cdrStorage.Close() defer func() { rs.ExitChan <- true }() *reply = "Done!" return diff --git a/engine/stats.go b/engine/stats.go index 7d494965b..861d122d8 100644 --- a/engine/stats.go +++ b/engine/stats.go @@ -35,6 +35,7 @@ type StatsInterface interface { AddQueue(*CdrStats, *int) error ReloadQueues([]string, *int) error ResetQueues([]string, *int) error + Stop(int, *int) error } type Stats struct { @@ -45,34 +46,48 @@ type Stats struct { accountingDb AccountingStorage defaultSaveInterval time.Duration } + +type saveFunc func(*queueSaver) + type queueSaver struct { - ticker *time.Ticker - stopper chan bool + ticker *time.Ticker + stopper chan bool + save saveFunc + id string + saveInterval time.Duration + sq *StatsQueue + adb AccountingStorage } func newQueueSaver(id string, saveInterval time.Duration, sq *StatsQueue, adb AccountingStorage) *queueSaver { svr := &queueSaver{ - ticker: time.NewTicker(saveInterval), - stopper: make(chan bool), + ticker: time.NewTicker(saveInterval), + stopper: make(chan bool), + id: id, + saveInterval: saveInterval, + sq: sq, + adb: adb, } - go func(id string, c <-chan time.Time, s <-chan bool, sq *StatsQueue, accountDb AccountingStorage) { + svr.save = func(svr *queueSaver) { for { select { - case <-c: - if sq.IsDirty() { - if err := accountDb.SetCdrStatsQueue(sq); err != nil { + case <-svr.ticker.C: + if svr.sq.IsDirty() { + if err := svr.adb.SetCdrStatsQueue(svr.sq); err != nil { Logger.Err(fmt.Sprintf("Error saving cdr stats queue id %s: %v", id, err)) } } - case <-s: + case <-svr.stopper: break } } - }(id, svr.ticker.C, svr.stopper, sq, adb) + } + go svr.save(svr) return svr } func (svr *queueSaver) stop() { + svr.save(svr) svr.ticker.Stop() svr.stopper <- true } @@ -248,6 +263,15 @@ func (s *Stats) AppendCDR(cdr *StoredCdr, out *int) error { return nil } +func (s *Stats) Stop(int, *int) error { + s.mux.RLock() + defer s.mux.RUnlock() + for _, saver := range s.queueSavers { + saver.stop() + } + return nil +} + type ProxyStats struct { Client *rpcclient.RpcClient } @@ -283,3 +307,7 @@ func (ps *ProxyStats) ReloadQueues(ids []string, out *int) error { func (ps *ProxyStats) ResetQueues(ids []string, out *int) error { return ps.Client.Call("Stats.ResetQueues", ids, out) } + +func (ps *ProxyStats) Stop(i int, r *int) error { + return ps.Client.Call("Stats.Stop", 0, i) +}