Merge fix

This commit is contained in:
DanB
2015-06-30 10:20:37 +02:00
4 changed files with 25 additions and 40 deletions

View File

@@ -20,9 +20,6 @@ package v1
import (
"fmt"
"github.com/cgrates/cgrates/config"
"github.com/cgrates/cgrates/engine"
"github.com/cgrates/cgrates/utils"
"net/http"
"net/rpc"
"net/rpc/jsonrpc"
@@ -30,6 +27,10 @@ import (
"reflect"
"testing"
"time"
"github.com/cgrates/cgrates/config"
"github.com/cgrates/cgrates/engine"
"github.com/cgrates/cgrates/utils"
)
var cdrstCfgPath string

View File

@@ -19,7 +19,6 @@ along with this program. If not, see <http://www.gnu.org/licenses/>
package engine
import (
"errors"
"fmt"
"sync"
"time"
@@ -47,54 +46,38 @@ type Stats struct {
defaultSaveInterval time.Duration
}
type saveFunc func(*queueSaver)
type queueSaver struct {
ticker *time.Ticker
stopper chan bool
save saveFunc
id string
saveInterval time.Duration
save func(*queueSaver)
sq *StatsQueue
adb AccountingStorage
accountingDb AccountingStorage
}
func newQueueSaver(id string, saveInterval time.Duration, sq *StatsQueue, adb AccountingStorage) *queueSaver {
func newQueueSaver(saveInterval time.Duration, sq *StatsQueue, adb AccountingStorage) *queueSaver {
svr := &queueSaver{
ticker: time.NewTicker(saveInterval),
stopper: make(chan bool),
id: id,
saveInterval: saveInterval,
sq: sq,
adb: adb,
accountingDb: adb,
}
svr.save = func(svr *queueSaver) {
//Logger.Debug(fmt.Sprintf("svr.save, statsQueue: %+v", sq))
go func(saveInterval time.Duration, sq *StatsQueue, adb AccountingStorage) {
for {
select {
case <-svr.ticker.C:
//Logger.Debug(fmt.Sprintf("svr.ticket.C, statsQueue: %+v", sq))
if svr.sq.IsDirty() {
//Logger.Debug("svr.sq.IsDirty")
svr.sq.mux.Lock()
if err := svr.adb.SetCdrStatsQueue(svr.sq); err != nil {
Logger.Err(fmt.Sprintf("Error saving cdr stats queue id %s: %v", id, err))
}
svr.sq.mux.Unlock()
}
sq.Save(adb)
case <-svr.stopper:
break
}
}
}
go svr.save(svr)
}(saveInterval, sq, adb)
return svr
}
func (svr *queueSaver) stop() {
svr.save(svr)
svr.ticker.Stop()
svr.stopper <- true
svr.sq.Save(svr.accountingDb)
}
func NewStats(ratingDb RatingStorage, accountingDb AccountingStorage, saveInterval time.Duration) *Stats {
@@ -125,7 +108,7 @@ func (s *Stats) GetValues(sqID string, values *map[string]float64) error {
*values = sq.GetStats()
return nil
}
return errors.New("Not Found")
return utils.ErrNotFound
}
func (s *Stats) AddQueue(cs *CdrStats, out *int) error {
@@ -249,7 +232,7 @@ func (s *Stats) setupQueueSaver(sq *StatsQueue) {
si = s.defaultSaveInterval
}
if si > 0 {
s.queueSavers[sq.GetId()] = newQueueSaver(sq.GetId(), si, sq, s.accountingDb)
s.queueSavers[sq.GetId()] = newQueueSaver(si, sq, s.accountingDb)
}
}

View File

@@ -19,6 +19,7 @@ along with this program. If not, see <http://www.gnu.org/licenses/>
package engine
import (
"fmt"
"strings"
"sync"
"time"
@@ -84,6 +85,16 @@ func (sq *StatsQueue) UpdateConf(conf *CdrStats) {
}
}
func (sq *StatsQueue) Save(adb AccountingStorage) {
sq.mux.Lock()
defer sq.mux.Unlock()
if sq.dirty {
if err := adb.SetCdrStatsQueue(sq); err != nil {
Logger.Err(fmt.Sprintf("Error saving cdr stats queue id %s: %v", sq.GetId(), err))
}
}
}
func (sq *StatsQueue) Load(saved *StatsQueue) {
sq.mux.Lock()
defer sq.mux.Unlock()
@@ -95,15 +106,6 @@ func (sq *StatsQueue) Load(saved *StatsQueue) {
}
}
func (sq *StatsQueue) IsDirty() bool {
sq.mux.Lock()
defer sq.mux.Unlock()
v := sq.dirty
// take advantage of the locking to set it to flip it
sq.dirty = false
return v
}
func (sq *StatsQueue) AppendCDR(cdr *StoredCdr) {
sq.mux.Lock()
defer sq.mux.Unlock()

View File

@@ -680,7 +680,6 @@ func (rs *RedisStorage) GetCdrStatsQueue(key string) (sq *StatsQueue, err error)
sq = &StatsQueue{Metrics: make(map[string]Metric)}
err = rs.ms.Unmarshal(values, sq)
}
return
}