mirror of
https://github.com/cgrates/cgrates.git
synced 2026-02-11 18:16:24 +05:00
better stats shutdown
This commit is contained in:
@@ -285,7 +285,10 @@ func (rs *Responder) Shutdown(arg string, reply *string) (err error) {
|
||||
if rs.Bal != nil {
|
||||
rs.Bal.Shutdown("Responder.Shutdown")
|
||||
}
|
||||
ratingStorage.(Storage).Close()
|
||||
ratingStorage.Close()
|
||||
accountingStorage.Close()
|
||||
storageLogger.Close()
|
||||
cdrStorage.Close()
|
||||
defer func() { rs.ExitChan <- true }()
|
||||
*reply = "Done!"
|
||||
return
|
||||
|
||||
@@ -35,6 +35,7 @@ type StatsInterface interface {
|
||||
AddQueue(*CdrStats, *int) error
|
||||
ReloadQueues([]string, *int) error
|
||||
ResetQueues([]string, *int) error
|
||||
Stop(int, *int) error
|
||||
}
|
||||
|
||||
type Stats struct {
|
||||
@@ -45,34 +46,48 @@ type Stats struct {
|
||||
accountingDb AccountingStorage
|
||||
defaultSaveInterval time.Duration
|
||||
}
|
||||
|
||||
type saveFunc func(*queueSaver)
|
||||
|
||||
type queueSaver struct {
|
||||
ticker *time.Ticker
|
||||
stopper chan bool
|
||||
ticker *time.Ticker
|
||||
stopper chan bool
|
||||
save saveFunc
|
||||
id string
|
||||
saveInterval time.Duration
|
||||
sq *StatsQueue
|
||||
adb AccountingStorage
|
||||
}
|
||||
|
||||
func newQueueSaver(id string, saveInterval time.Duration, sq *StatsQueue, adb AccountingStorage) *queueSaver {
|
||||
svr := &queueSaver{
|
||||
ticker: time.NewTicker(saveInterval),
|
||||
stopper: make(chan bool),
|
||||
ticker: time.NewTicker(saveInterval),
|
||||
stopper: make(chan bool),
|
||||
id: id,
|
||||
saveInterval: saveInterval,
|
||||
sq: sq,
|
||||
adb: adb,
|
||||
}
|
||||
go func(id string, c <-chan time.Time, s <-chan bool, sq *StatsQueue, accountDb AccountingStorage) {
|
||||
svr.save = func(svr *queueSaver) {
|
||||
for {
|
||||
select {
|
||||
case <-c:
|
||||
if sq.IsDirty() {
|
||||
if err := accountDb.SetCdrStatsQueue(sq); err != nil {
|
||||
case <-svr.ticker.C:
|
||||
if svr.sq.IsDirty() {
|
||||
if err := svr.adb.SetCdrStatsQueue(svr.sq); err != nil {
|
||||
Logger.Err(fmt.Sprintf("Error saving cdr stats queue id %s: %v", id, err))
|
||||
}
|
||||
}
|
||||
case <-s:
|
||||
case <-svr.stopper:
|
||||
break
|
||||
}
|
||||
}
|
||||
}(id, svr.ticker.C, svr.stopper, sq, adb)
|
||||
}
|
||||
go svr.save(svr)
|
||||
return svr
|
||||
}
|
||||
|
||||
func (svr *queueSaver) stop() {
|
||||
svr.save(svr)
|
||||
svr.ticker.Stop()
|
||||
svr.stopper <- true
|
||||
}
|
||||
@@ -248,6 +263,15 @@ func (s *Stats) AppendCDR(cdr *StoredCdr, out *int) error {
|
||||
return nil
|
||||
}
|
||||
|
||||
func (s *Stats) Stop(int, *int) error {
|
||||
s.mux.RLock()
|
||||
defer s.mux.RUnlock()
|
||||
for _, saver := range s.queueSavers {
|
||||
saver.stop()
|
||||
}
|
||||
return nil
|
||||
}
|
||||
|
||||
type ProxyStats struct {
|
||||
Client *rpcclient.RpcClient
|
||||
}
|
||||
@@ -283,3 +307,7 @@ func (ps *ProxyStats) ReloadQueues(ids []string, out *int) error {
|
||||
func (ps *ProxyStats) ResetQueues(ids []string, out *int) error {
|
||||
return ps.Client.Call("Stats.ResetQueues", ids, out)
|
||||
}
|
||||
|
||||
func (ps *ProxyStats) Stop(i int, r *int) error {
|
||||
return ps.Client.Call("Stats.Stop", 0, i)
|
||||
}
|
||||
|
||||
Reference in New Issue
Block a user