mirror of
https://github.com/cgrates/cgrates.git
synced 2026-02-13 02:56:24 +05:00
StatsEvents saved on db for each queue
This commit is contained in:
@@ -42,7 +42,7 @@ type StatsMetric interface {
|
||||
addEvent(ev StatsEvent) error
|
||||
remEvent(ev StatsEvent) error
|
||||
getStoredValues() ([]byte, error) // used to generate the values which are stored into DB
|
||||
loadStoredValues([]byte) error // load the values from DB
|
||||
loadStoredValues([]byte) error // load the values from DB data
|
||||
}
|
||||
|
||||
func NewStatsASR() (StatsMetric, error) {
|
||||
|
||||
@@ -18,7 +18,6 @@ along with this program. If not, see <http://www.gnu.org/licenses/>
|
||||
package engine
|
||||
|
||||
import (
|
||||
"errors"
|
||||
"fmt"
|
||||
"sync"
|
||||
"time"
|
||||
@@ -34,7 +33,8 @@ type SQItem struct {
|
||||
|
||||
// SQStored contains values saved in DB on store
|
||||
type StoredSQ struct {
|
||||
SQItems []*SQItem
|
||||
SEvents map[string]StatsEvent // Events used by SQItems
|
||||
SQItems []*SQItem // SQItems
|
||||
SQMetrics map[string][]byte
|
||||
}
|
||||
|
||||
@@ -63,6 +63,9 @@ func (sq *StatsQueue) Init(sec *StatsEventCache, storedSQ *StoredSQ) (err error)
|
||||
if storedSQ == nil {
|
||||
return
|
||||
}
|
||||
for evID, ev := range storedSQ.SEvents {
|
||||
sq.sec.Cache(evID, ev, sq.ID)
|
||||
}
|
||||
sq.sqItems = storedSQ.SQItems
|
||||
for metricID := range sq.sqMetrics {
|
||||
if sq.sqMetrics[metricID], err = NewStatsMetric(metricID); err != nil {
|
||||
@@ -78,23 +81,46 @@ func (sq *StatsQueue) Init(sec *StatsEventCache, storedSQ *StoredSQ) (err error)
|
||||
}
|
||||
|
||||
// GetStoredSQ retrieves the data used for store to DB
|
||||
func (sq *StatsQueue) GetStoredSQ() (sSQ *StoredSQ, err error) {
|
||||
func (sq *StatsQueue) GetStoredSQ() (sSQ *StoredSQ) {
|
||||
sq.RLock()
|
||||
defer sq.RUnlock()
|
||||
if !sq.Store {
|
||||
return nil, errors.New("not storable")
|
||||
sEvents := make(map[string]StatsEvent)
|
||||
var sItems []*SQItem
|
||||
for _, sqItem := range sq.sqItems { // make sure event is properly retrieved from cache
|
||||
ev := sq.sec.GetEvent(sqItem.EventID)
|
||||
if ev == nil {
|
||||
utils.Logger.Warning(fmt.Sprintf("<StatsQueue> querying for storage eventID: %s, error: event not cached",
|
||||
sqItem.EventID))
|
||||
continue
|
||||
}
|
||||
sEvents[sqItem.EventID] = ev
|
||||
sItems = append(sItems, sqItem)
|
||||
}
|
||||
sSQ = &StoredSQ{
|
||||
SQItems: sq.sqItems,
|
||||
SEvents: sEvents,
|
||||
SQItems: sItems,
|
||||
SQMetrics: make(map[string][]byte, len(sq.sqMetrics))}
|
||||
for metricID, metric := range sq.sqMetrics {
|
||||
var err error
|
||||
if sSQ.SQMetrics[metricID], err = metric.getStoredValues(); err != nil {
|
||||
return nil, err
|
||||
utils.Logger.Warning(fmt.Sprintf("<StatsQueue> querying for storage metricID: %s, error: %s",
|
||||
metricID, err.Error()))
|
||||
continue
|
||||
}
|
||||
}
|
||||
return
|
||||
}
|
||||
|
||||
// ProcessEvent processes a StatsEvent, returns true if processed
|
||||
func (sq *StatsQueue) ProcessEvent(ev StatsEvent) (err error) {
|
||||
sq.Lock()
|
||||
sq.remExpired()
|
||||
sq.remOnQueueLength()
|
||||
sq.addStatsEvent(ev)
|
||||
sq.Unlock()
|
||||
return
|
||||
}
|
||||
|
||||
// remExpired expires items in queue
|
||||
func (sq *StatsQueue) remExpired() {
|
||||
var expIdx *int // index of last item to be expired
|
||||
@@ -153,13 +179,3 @@ func (sq *StatsQueue) remEventWithID(evID string) {
|
||||
}
|
||||
}
|
||||
}
|
||||
|
||||
// ProcessEvent processes a StatsEvent, returns true if processed
|
||||
func (sq *StatsQueue) ProcessEvent(ev StatsEvent) (err error) {
|
||||
sq.Lock()
|
||||
sq.remExpired()
|
||||
sq.remOnQueueLength()
|
||||
sq.addStatsEvent(ev)
|
||||
sq.Unlock()
|
||||
return
|
||||
}
|
||||
|
||||
Reference in New Issue
Block a user