mirror of
https://github.com/cgrates/cgrates.git
synced 2026-02-11 18:16:24 +05:00
cleanner queue saver
This commit is contained in:
@@ -20,9 +20,6 @@ package v1
|
||||
|
||||
import (
|
||||
"fmt"
|
||||
"github.com/cgrates/cgrates/config"
|
||||
"github.com/cgrates/cgrates/engine"
|
||||
"github.com/cgrates/cgrates/utils"
|
||||
"net/http"
|
||||
"net/rpc"
|
||||
"net/rpc/jsonrpc"
|
||||
@@ -30,6 +27,10 @@ import (
|
||||
"reflect"
|
||||
"testing"
|
||||
"time"
|
||||
|
||||
"github.com/cgrates/cgrates/config"
|
||||
"github.com/cgrates/cgrates/engine"
|
||||
"github.com/cgrates/cgrates/utils"
|
||||
)
|
||||
|
||||
var cdrstCfgPath string
|
||||
|
||||
@@ -19,7 +19,6 @@ along with this program. If not, see <http://www.gnu.org/licenses/>
|
||||
package engine
|
||||
|
||||
import (
|
||||
"errors"
|
||||
"fmt"
|
||||
"sync"
|
||||
"time"
|
||||
@@ -47,51 +46,38 @@ type Stats struct {
|
||||
defaultSaveInterval time.Duration
|
||||
}
|
||||
|
||||
type saveFunc func(*queueSaver)
|
||||
|
||||
type queueSaver struct {
|
||||
ticker *time.Ticker
|
||||
stopper chan bool
|
||||
save saveFunc
|
||||
id string
|
||||
saveInterval time.Duration
|
||||
save func(*queueSaver)
|
||||
sq *StatsQueue
|
||||
adb AccountingStorage
|
||||
accountingDb AccountingStorage
|
||||
}
|
||||
|
||||
func newQueueSaver(id string, saveInterval time.Duration, sq *StatsQueue, adb AccountingStorage) *queueSaver {
|
||||
func newQueueSaver(saveInterval time.Duration, sq *StatsQueue, adb AccountingStorage) *queueSaver {
|
||||
svr := &queueSaver{
|
||||
ticker: time.NewTicker(saveInterval),
|
||||
stopper: make(chan bool),
|
||||
id: id,
|
||||
saveInterval: saveInterval,
|
||||
sq: sq,
|
||||
adb: adb,
|
||||
accountingDb: adb,
|
||||
}
|
||||
svr.save = func(svr *queueSaver) {
|
||||
go func(saveInterval time.Duration, sq *StatsQueue, adb AccountingStorage) {
|
||||
for {
|
||||
select {
|
||||
case <-svr.ticker.C:
|
||||
if svr.sq.IsDirty() {
|
||||
svr.sq.mux.Lock()
|
||||
if err := svr.adb.SetCdrStatsQueue(svr.sq); err != nil {
|
||||
Logger.Err(fmt.Sprintf("Error saving cdr stats queue id %s: %v", id, err))
|
||||
}
|
||||
svr.sq.mux.Unlock()
|
||||
}
|
||||
sq.Save(adb)
|
||||
case <-svr.stopper:
|
||||
break
|
||||
}
|
||||
}
|
||||
}
|
||||
go svr.save(svr)
|
||||
}(saveInterval, sq, adb)
|
||||
return svr
|
||||
}
|
||||
|
||||
func (svr *queueSaver) stop() {
|
||||
svr.save(svr)
|
||||
svr.ticker.Stop()
|
||||
svr.stopper <- true
|
||||
svr.sq.Save(svr.accountingDb)
|
||||
}
|
||||
|
||||
func NewStats(ratingDb RatingStorage, accountingDb AccountingStorage, saveInterval time.Duration) *Stats {
|
||||
@@ -122,7 +108,7 @@ func (s *Stats) GetValues(sqID string, values *map[string]float64) error {
|
||||
*values = sq.GetStats()
|
||||
return nil
|
||||
}
|
||||
return errors.New("Not Found")
|
||||
return utils.ErrNotFound
|
||||
}
|
||||
|
||||
func (s *Stats) AddQueue(cs *CdrStats, out *int) error {
|
||||
@@ -252,7 +238,7 @@ func (s *Stats) setupQueueSaver(sq *StatsQueue) {
|
||||
si = s.defaultSaveInterval
|
||||
}
|
||||
if si > 0 {
|
||||
s.queueSavers[sq.GetId()] = newQueueSaver(sq.GetId(), si, sq, s.accountingDb)
|
||||
s.queueSavers[sq.GetId()] = newQueueSaver(si, sq, s.accountingDb)
|
||||
}
|
||||
}
|
||||
|
||||
|
||||
@@ -19,6 +19,7 @@ along with this program. If not, see <http://www.gnu.org/licenses/>
|
||||
package engine
|
||||
|
||||
import (
|
||||
"fmt"
|
||||
"strings"
|
||||
"sync"
|
||||
"time"
|
||||
@@ -84,6 +85,16 @@ func (sq *StatsQueue) UpdateConf(conf *CdrStats) {
|
||||
}
|
||||
}
|
||||
|
||||
func (sq *StatsQueue) Save(adb AccountingStorage) {
|
||||
sq.mux.Lock()
|
||||
defer sq.mux.Unlock()
|
||||
if sq.dirty {
|
||||
if err := adb.SetCdrStatsQueue(sq); err != nil {
|
||||
Logger.Err(fmt.Sprintf("Error saving cdr stats queue id %s: %v", sq.GetId(), err))
|
||||
}
|
||||
}
|
||||
}
|
||||
|
||||
func (sq *StatsQueue) Load(saved *StatsQueue) {
|
||||
sq.mux.Lock()
|
||||
defer sq.mux.Unlock()
|
||||
@@ -95,15 +106,6 @@ func (sq *StatsQueue) Load(saved *StatsQueue) {
|
||||
}
|
||||
}
|
||||
|
||||
func (sq *StatsQueue) IsDirty() bool {
|
||||
sq.mux.Lock()
|
||||
defer sq.mux.Unlock()
|
||||
v := sq.dirty
|
||||
// take advantage of the locking to set it to flip it
|
||||
sq.dirty = false
|
||||
return v
|
||||
}
|
||||
|
||||
func (sq *StatsQueue) AppendCDR(cdr *StoredCdr) {
|
||||
sq.mux.Lock()
|
||||
defer sq.mux.Unlock()
|
||||
|
||||
@@ -680,7 +680,6 @@ func (rs *RedisStorage) GetCdrStatsQueue(key string) (sq *StatsQueue, err error)
|
||||
sq = &StatsQueue{Metrics: make(map[string]Metric)}
|
||||
err = rs.ms.Unmarshal(values, sq)
|
||||
}
|
||||
|
||||
return
|
||||
}
|
||||
|
||||
|
||||
Reference in New Issue
Block a user