From cf7b11e765ef38dcf755b89186b64a7f6e36276d Mon Sep 17 00:00:00 2001 From: Radu Ioan Fericean Date: Mon, 29 Jun 2015 23:29:25 +0300 Subject: [PATCH] cleanner queue saver --- apier/v1/cdrstatsv1_local_test.go | 7 ++++--- engine/stats.go | 34 +++++++++---------------------- engine/stats_queue.go | 20 ++++++++++-------- engine/storage_redis.go | 1 - 4 files changed, 25 insertions(+), 37 deletions(-) diff --git a/apier/v1/cdrstatsv1_local_test.go b/apier/v1/cdrstatsv1_local_test.go index bc6f59355..1d5d2f4d9 100644 --- a/apier/v1/cdrstatsv1_local_test.go +++ b/apier/v1/cdrstatsv1_local_test.go @@ -20,9 +20,6 @@ package v1 import ( "fmt" - "github.com/cgrates/cgrates/config" - "github.com/cgrates/cgrates/engine" - "github.com/cgrates/cgrates/utils" "net/http" "net/rpc" "net/rpc/jsonrpc" @@ -30,6 +27,10 @@ import ( "reflect" "testing" "time" + + "github.com/cgrates/cgrates/config" + "github.com/cgrates/cgrates/engine" + "github.com/cgrates/cgrates/utils" ) var cdrstCfgPath string diff --git a/engine/stats.go b/engine/stats.go index cb5f84986..bdcc6282c 100644 --- a/engine/stats.go +++ b/engine/stats.go @@ -19,7 +19,6 @@ along with this program. If not, see package engine import ( - "errors" "fmt" "sync" "time" @@ -47,51 +46,38 @@ type Stats struct { defaultSaveInterval time.Duration } -type saveFunc func(*queueSaver) - type queueSaver struct { ticker *time.Ticker stopper chan bool - save saveFunc - id string - saveInterval time.Duration + save func(*queueSaver) sq *StatsQueue - adb AccountingStorage + accountingDb AccountingStorage } -func newQueueSaver(id string, saveInterval time.Duration, sq *StatsQueue, adb AccountingStorage) *queueSaver { +func newQueueSaver(saveInterval time.Duration, sq *StatsQueue, adb AccountingStorage) *queueSaver { svr := &queueSaver{ ticker: time.NewTicker(saveInterval), stopper: make(chan bool), - id: id, - saveInterval: saveInterval, sq: sq, - adb: adb, + accountingDb: adb, } - svr.save = func(svr *queueSaver) { + go func(saveInterval time.Duration, sq *StatsQueue, adb AccountingStorage) { for { select { case <-svr.ticker.C: - if svr.sq.IsDirty() { - svr.sq.mux.Lock() - if err := svr.adb.SetCdrStatsQueue(svr.sq); err != nil { - Logger.Err(fmt.Sprintf("Error saving cdr stats queue id %s: %v", id, err)) - } - svr.sq.mux.Unlock() - } + sq.Save(adb) case <-svr.stopper: break } } - } - go svr.save(svr) + }(saveInterval, sq, adb) return svr } func (svr *queueSaver) stop() { - svr.save(svr) svr.ticker.Stop() svr.stopper <- true + svr.sq.Save(svr.accountingDb) } func NewStats(ratingDb RatingStorage, accountingDb AccountingStorage, saveInterval time.Duration) *Stats { @@ -122,7 +108,7 @@ func (s *Stats) GetValues(sqID string, values *map[string]float64) error { *values = sq.GetStats() return nil } - return errors.New("Not Found") + return utils.ErrNotFound } func (s *Stats) AddQueue(cs *CdrStats, out *int) error { @@ -252,7 +238,7 @@ func (s *Stats) setupQueueSaver(sq *StatsQueue) { si = s.defaultSaveInterval } if si > 0 { - s.queueSavers[sq.GetId()] = newQueueSaver(sq.GetId(), si, sq, s.accountingDb) + s.queueSavers[sq.GetId()] = newQueueSaver(si, sq, s.accountingDb) } } diff --git a/engine/stats_queue.go b/engine/stats_queue.go index 911532406..048b82cec 100644 --- a/engine/stats_queue.go +++ b/engine/stats_queue.go @@ -19,6 +19,7 @@ along with this program. If not, see package engine import ( + "fmt" "strings" "sync" "time" @@ -84,6 +85,16 @@ func (sq *StatsQueue) UpdateConf(conf *CdrStats) { } } +func (sq *StatsQueue) Save(adb AccountingStorage) { + sq.mux.Lock() + defer sq.mux.Unlock() + if sq.dirty { + if err := adb.SetCdrStatsQueue(sq); err != nil { + Logger.Err(fmt.Sprintf("Error saving cdr stats queue id %s: %v", sq.GetId(), err)) + } + } +} + func (sq *StatsQueue) Load(saved *StatsQueue) { sq.mux.Lock() defer sq.mux.Unlock() @@ -95,15 +106,6 @@ func (sq *StatsQueue) Load(saved *StatsQueue) { } } -func (sq *StatsQueue) IsDirty() bool { - sq.mux.Lock() - defer sq.mux.Unlock() - v := sq.dirty - // take advantage of the locking to set it to flip it - sq.dirty = false - return v -} - func (sq *StatsQueue) AppendCDR(cdr *StoredCdr) { sq.mux.Lock() defer sq.mux.Unlock() diff --git a/engine/storage_redis.go b/engine/storage_redis.go index a367a1878..131c70280 100644 --- a/engine/storage_redis.go +++ b/engine/storage_redis.go @@ -680,7 +680,6 @@ func (rs *RedisStorage) GetCdrStatsQueue(key string) (sq *StatsQueue, err error) sq = &StatsQueue{Metrics: make(map[string]Metric)} err = rs.ms.Unmarshal(values, sq) } - return }