diff --git a/stats/service.go b/stats/service.go index 7142d505a..e48e7509d 100644 --- a/stats/service.go +++ b/stats/service.go @@ -20,24 +20,56 @@ package stats import ( "errors" "fmt" + "math/rand" "sync" + "time" "github.com/cgrates/cgrates/engine" "github.com/cgrates/cgrates/utils" ) +func init() { + rand.Seed(time.Now().UnixNano()) +} + // NewStatService initializes a StatService -func NewStatService(dataDB engine.DataDB) (ss *StatService, err error) { - ss = &StatService{dataDB: dataDB} +func NewStatService(dataDB engine.DataDB, ms engine.Marshaler, storeInterval time.Duration) (ss *StatService, err error) { + ss = &StatService{dataDB: dataDB, ms: ms, storeInterval: storeInterval, + stopStoring: make(chan struct{}), evCache: NewStatsEventCache()} + sqPrfxs, err := dataDB.GetKeysForPrefix(utils.StatsQueuePrefix) + if err != nil { + return nil, err + } + ss.stInsts = make(StatsInstances, len(sqPrfxs)) + for i, prfx := range sqPrfxs { + sq, err := dataDB.GetStatsQueue(prfx[len(utils.StatsQueuePrefix):], false, utils.NonTransactional) + if err != nil { + return nil, err + } + var sqSM *engine.SQStoredMetrics + if sq.Store { + if sqSM, err = dataDB.GetSQStoredMetrics(sq.ID); err != nil && err != utils.ErrNotFound { + return nil, err + } + } + if ss.stInsts[i], err = NewStatsInstance(ss.evCache, ss.ms, sq, sqSM); err != nil { + return nil, err + } + } + ss.stInsts.Sort() + go ss.dumpStoredMetrics() return } // StatService builds stats for events type StatService struct { sync.RWMutex - dataDB engine.DataDB - stQueues StatsInstances // ordered list of StatsQueues - evCache *StatsEventCache // so we can pass it to queues + dataDB engine.DataDB + ms engine.Marshaler + storeInterval time.Duration + stopStoring chan struct{} + evCache *StatsEventCache // so we can pass it to queues + stInsts StatsInstances // ordered list of StatsQueues } // Called to start the service @@ -47,6 +79,8 @@ func (ss *StatService) ListenAndServe() error { // Called to shutdown the service func (ss *StatService) ServiceShutdown() error { + close(ss.stopStoring) + ss.storeMetrics() return nil } @@ -56,16 +90,43 @@ func (ss *StatService) processEvent(ev engine.StatsEvent) (err error) { if evStatsID == "" { // ID is mandatory return errors.New("missing ID field") } - for _, stQ := range ss.stQueues { - if err := stQ.ProcessEvent(ev); err != nil { - utils.Logger.Warning(fmt.Sprintf(" QueueID: %s, ignoring event with ID: %s, error: %s", - stQ.cfg.ID, evStatsID, err.Error())) + for _, stInst := range ss.stInsts { + if err := stInst.ProcessEvent(ev); err != nil { + utils.Logger.Warning( + fmt.Sprintf(" QueueID: %s, ignoring event with ID: %s, error: %s", + stInst.cfg.ID, evStatsID, err.Error())) } } return } -// store stores the necessary data to DB -func (ss *StatService) store() (err error) { +// store stores the necessary storedMetrics to dataDB +func (ss *StatService) storeMetrics() { + for _, si := range ss.stInsts { + if !si.cfg.Store || !si.dirty { // no need to save + continue + } + if siSM := si.GetStoredMetrics(); siSM != nil { + if err := ss.dataDB.SetSQStoredMetrics(siSM); err != nil { + utils.Logger.Warning( + fmt.Sprintf(" failed saving StoredMetrics for QueueID: %s, error: %s", + si.cfg.ID, err.Error())) + } + } + // randomize the CPU load and give up thread control + time.Sleep(time.Duration(rand.Intn(1000)) * time.Nanosecond) + } return } + +// dumpStoredMetrics regularly dumps metrics to dataDB +func (ss *StatService) dumpStoredMetrics() { + for { + select { + case <-ss.stopStoring: + return + } + ss.storeMetrics() + time.Sleep(ss.storeInterval) + } +} diff --git a/stats/sinstance.go b/stats/sinstance.go index 94a325c81..55253f1d7 100644 --- a/stats/sinstance.go +++ b/stats/sinstance.go @@ -19,6 +19,7 @@ package stats import ( "fmt" + "sort" "sync" "time" @@ -26,8 +27,37 @@ import ( "github.com/cgrates/cgrates/utils" ) +// StatsInstances is a sortable list of StatsInstance type StatsInstances []*StatsInstance +// Sort is part of sort interface, sort based on Weight +func (sis StatsInstances) Sort() { + sort.Slice(sis, func(i, j int) bool { return sis[i].cfg.Weight > sis[j].cfg.Weight }) +} + +// NewStatsInstance instantiates a StatsInstance +func NewStatsInstance(sec *StatsEventCache, ms engine.Marshaler, + sqCfg *engine.StatsQueue, sqSM *engine.SQStoredMetrics) (si *StatsInstance, err error) { + si = &StatsInstance{sec: sec, ms: ms, cfg: sqCfg} + if sqSM != nil { + for evID, ev := range sqSM.SEvents { + si.sec.Cache(evID, ev, si.cfg.ID) + } + si.sqItems = sqSM.SQItems + for metricID := range si.sqMetrics { + if si.sqMetrics[metricID], err = NewStatsMetric(metricID); err != nil { + return + } + if stored, has := sqSM.SQMetrics[metricID]; !has { + continue + } else if err = si.sqMetrics[metricID].SetFromMarshaled(stored, ms); err != nil { + return + } + } + } + return +} + // StatsInstance represents an individual stats instance type StatsInstance struct { sync.RWMutex @@ -39,30 +69,6 @@ type StatsInstance struct { cfg *engine.StatsQueue } -// Init prepares a StatsInstance for operations -// Should be executed at server start -func (sq *StatsInstance) Init(sec *StatsEventCache, ms engine.Marshaler, sqSM *engine.SQStoredMetrics) (err error) { - sq.sec = sec - if sqSM == nil { - return - } - for evID, ev := range sqSM.SEvents { - sq.sec.Cache(evID, ev, sq.cfg.ID) - } - sq.sqItems = sqSM.SQItems - for metricID := range sq.sqMetrics { - if sq.sqMetrics[metricID], err = NewStatsMetric(metricID); err != nil { - return - } - if stored, has := sqSM.SQMetrics[metricID]; !has { - continue - } else if err = sq.sqMetrics[metricID].SetFromMarshaled(stored, ms); err != nil { - return - } - } - return -} - // GetSQStoredMetrics retrieves the data used for store to DB func (sq *StatsInstance) GetStoredMetrics() (sqSM *engine.SQStoredMetrics) { sq.RLock() diff --git a/stats/sinstance_test.go b/stats/sinstance_test.go new file mode 100644 index 000000000..7735d3643 --- /dev/null +++ b/stats/sinstance_test.go @@ -0,0 +1,44 @@ +/* +Real-time Online/Offline Charging System (OCS) for Telecom & ISP environments +Copyright (C) ITsysCOM GmbH + +This program is free software: you can redistribute it and/or modify +it under the terms of the GNU General Public License as published by +the Free Software Foundation, either version 3 of the License, or +(at your option) any later version. + +This program is distributed in the hope that it will be useful, +but WITHOUT ANY WARRANTY; without even the implied warranty of +MERCHANTABILITY or FITNESS FOR A PARTICULAR PURPOSE. See the +GNU General Public License for more details. + +You should have received a copy of the GNU General Public License +along with this program. If not, see +*/ +package stats + +import ( + "reflect" + "testing" + + "github.com/cgrates/cgrates/engine" +) + +func TestStatsInstancesSort(t *testing.T) { + sInsts := StatsInstances{ + &StatsInstance{cfg: &engine.StatsQueue{ID: "FIRST", Weight: 30.0}}, + &StatsInstance{cfg: &engine.StatsQueue{ID: "SECOND", Weight: 40.0}}, + &StatsInstance{cfg: &engine.StatsQueue{ID: "THIRD", Weight: 30.0}}, + &StatsInstance{cfg: &engine.StatsQueue{ID: "FOURTH", Weight: 35.0}}, + } + sInsts.Sort() + eSInst := StatsInstances{ + &StatsInstance{cfg: &engine.StatsQueue{ID: "SECOND", Weight: 40.0}}, + &StatsInstance{cfg: &engine.StatsQueue{ID: "FOURTH", Weight: 35.0}}, + &StatsInstance{cfg: &engine.StatsQueue{ID: "FIRST", Weight: 30.0}}, + &StatsInstance{cfg: &engine.StatsQueue{ID: "THIRD", Weight: 30.0}}, + } + if !reflect.DeepEqual(eSInst, sInsts) { + t.Errorf("expecting: %+v, received: %+v", eSInst, sInsts) + } +}