diff --git a/engine/datamanager.go b/engine/datamanager.go index d38cb417e..c7056143f 100644 --- a/engine/datamanager.go +++ b/engine/datamanager.go @@ -30,6 +30,11 @@ type DataManager struct { dataDB DataDB } +// DataDB exports access to dataDB +func (dm *DataManager) DataDB() DataDB { + return dm.dataDB +} + // GetStatQueue retrieves a StatQueue from dataDB // handles caching and deserialization of metrics func (dm *DataManager) GetStatQueue(tenant, id string, skipCache bool, transactionID string) (sq *StatQueue, err error) { diff --git a/engine/libstats.go b/engine/libstats.go index 0a6c53189..8f9b6b5e6 100755 --- a/engine/libstats.go +++ b/engine/libstats.go @@ -151,7 +151,7 @@ type StatQueue struct { } // SqID will compose the unique identifier for the StatQueue out of Tenant and ID -func (sq *StatQueue) SqID() string { +func (sq *StatQueue) TenantID() string { return utils.ConcatenatedKey(sq.Tenant, sq.ID) } diff --git a/engine/resources.go b/engine/resources.go index 78f7d2dc2..9c3efc9da 100755 --- a/engine/resources.go +++ b/engine/resources.go @@ -289,7 +289,7 @@ func (rS *ResourceService) storeResources() { break // no more keys, backup completed } if rIf, ok := cache.Get(utils.ResourcesPrefix + rID); !ok || rIf == nil { - utils.Logger.Warning(fmt.Sprintf(" failed retrieving from cache resource with ID: %s")) + utils.Logger.Warning(fmt.Sprintf(" failed retrieving from cache resource with ID: %s", rID)) } else if err := rS.StoreResource(rIf.(*Resource)); err != nil { failedRIDs = append(failedRIDs, rID) // record failure so we can schedule it for next backup } diff --git a/engine/stats.go b/engine/stats.go index 18b775b26..1221968b9 100644 --- a/engine/stats.go +++ b/engine/stats.go @@ -19,83 +19,139 @@ along with this program. If not, see package engine import ( + "fmt" "math/rand" + "sync" "time" + + "github.com/cgrates/cgrates/cache" + "github.com/cgrates/cgrates/utils" ) -func init() { - rand.Seed(time.Now().UnixNano()) -} - -/* // NewStatService initializes a StatService -func NewStatService(dataDB DataDB, ms Marshaler, storeInterval time.Duration) (ss *StatService, err error) { - ss = &StatService{dataDB: dataDB, ms: ms, storeInterval: storeInterval, - stopStoring: make(chan struct{})} - sqPrfxs, err := dataDB.GetKeysForPrefix(utils.StatsConfigPrefix) - if err != nil { - return nil, err - } - go ss.dumpStoredMetrics() // start dumpStoredMetrics loop - return +func NewStatService(dm *DataManager, storeInterval time.Duration) (ss *StatService, err error) { + return &StatService{dm: dm, storeInterval: storeInterval, + stopBackup: make(chan struct{})}, nil } // StatService builds stats for events type StatService struct { - dataDB DataDB - ms Marshaler - storeInterval time.Duration - stopStoring chan struct{} + dm *DataManager + storeInterval time.Duration + stopBackup chan struct{} + storedStatQueues utils.StringMap // keep a record of stats which need saving, map[statsTenantID]bool + ssqMux sync.RWMutex // protects storedStatQueues } // ListenAndServe loops keeps the service alive -func (ss *StatService) ListenAndServe(exitChan chan bool) error { +func (sS *StatService) ListenAndServe(exitChan chan bool) error { + go sS.runBackup() // start backup loop e := <-exitChan exitChan <- e // put back for the others listening for shutdown request return nil } -// Called to shutdown the service -// ToDo: improve with context, ie following http implementation -func (ss *StatService) Shutdown() error { +// Shutdown is called to shutdown the service +func (sS *StatService) Shutdown() error { utils.Logger.Info(" service shutdown initialized") - close(ss.stopStoring) - ss.storeMetrics() + close(sS.stopBackup) + sS.storeStats() utils.Logger.Info(" service shutdown complete") return nil } +// runBackup will regularly store resources changed to dataDB +func (sS *StatService) runBackup() { + if sS.storeInterval <= 0 { + return + } + for { + select { + case <-sS.stopBackup: + return + } + sS.storeStats() + } + time.Sleep(sS.storeInterval) +} + +// storeResources represents one task of complete backup +func (sS *StatService) storeStats() { + var failedSqIDs []string + for { // don't stop untill we store all dirty statQueues + sS.ssqMux.Lock() + sID := sS.storedStatQueues.GetOne() + if sID != "" { + delete(sS.storedStatQueues, sID) + } + sS.ssqMux.Unlock() + if sID == "" { + break // no more keys, backup completed + } + if sqIf, ok := cache.Get(utils.StatQueuePrefix + sID); !ok || sqIf == nil { + utils.Logger.Warning(fmt.Sprintf(" failed retrieving from cache stat queue with ID: %s", sID)) + } else if err := sS.StoreStatQueue(sqIf.(*StatQueue)); err != nil { + failedSqIDs = append(failedSqIDs, sID) // record failure so we can schedule it for next backup + } + // randomize the CPU load and give up thread control + time.Sleep(time.Duration(rand.Intn(1000)) * time.Nanosecond) + } + if len(failedSqIDs) != 0 { // there were errors on save, schedule the keys for next backup + sS.ssqMux.Lock() + for _, sqID := range failedSqIDs { + sS.storedStatQueues[sqID] = true + } + sS.ssqMux.Unlock() + } +} + +// StoreStatQueue stores the statQueue in DB and corrects dirty flag +func (sS *StatService) StoreStatQueue(sq *StatQueue) (err error) { + if sq.dirty == nil || !*sq.dirty { + return + } + if err = sS.dm.SetStatQueue(sq); err != nil { + utils.Logger.Warning( + fmt.Sprintf(" failed saving StatQueue with ID: %s, error: %s", + sq.TenantID(), err.Error())) + } else { + *sq.dirty = false + } + return +} + +/* // setQueue adds or modifies a queue into cache -// sort will reorder the ss.queues +// sort will reorder the sS.queues func (ss *StatService) loadQueue(qID string) (q *StatQueue, err error) { - sq, err := ss.dataDB.GetStatsConfig(qID) + sq, err := sS.dataDB.GetStatsConfig(qID) if err != nil { return nil, err } - return NewStatQueue(ss.evCache, ss.ms, sq, sqSM) + return NewStatQueue(sS.evCache, sS.ms, sq, sqSM) } func (ss *StatService) setQueue(q *StatQueue) { - ss.queuesCache[q.cfg.ID] = q - ss.queues = append(ss.queues, q) + sS.queuesCache[q.cfg.ID] = q + sS.queues = append(sS.queues, q) } // remQueue will remove a queue based on it's ID func (ss *StatService) remQueue(qID string) (si *StatQueue) { - si = ss.queuesCache[qID] - ss.queues.remWithID(qID) - delete(ss.queuesCache, qID) + si = sS.queuesCache[qID] + sS.queues.remWithID(qID) + delete(sS.queuesCache, qID) return } // store stores the necessary storedMetrics to dataDB func (ss *StatService) storeMetrics() { - for _, si := range ss.queues { + for _, si := range sS.queues { if !si.cfg.Store || !si.dirty { // no need to save continue } if siSM := si.GetStoredMetrics(); siSM != nil { - if err := ss.dataDB.SetSQStoredMetrics(siSM); err != nil { + if err := sS.dataDB.SetSQStoredMetrics(siSM); err != nil { utils.Logger.Warning( fmt.Sprintf(" failed saving StoredMetrics for QueueID: %s, error: %s", si.cfg.ID, err.Error())) @@ -111,11 +167,11 @@ func (ss *StatService) storeMetrics() { func (ss *StatService) dumpStoredMetrics() { for { select { - case <-ss.stopStoring: + case <-sS.stopStoring: return } - ss.storeMetrics() - time.Sleep(ss.storeInterval) + sS.storeMetrics() + time.Sleep(sS.storeInterval) } } @@ -125,7 +181,7 @@ func (ss *StatService) processEvent(ev StatsEvent) (err error) { if evStatsID == "" { // ID is mandatory return errors.New("missing ID field") } - for _, stInst := range ss.queues { + for _, stInst := range sS.queues { if err := stInst.ProcessEvent(ev); err != nil { utils.Logger.Warning( fmt.Sprintf(" QueueID: %s, ignoring event with ID: %s, error: %s", @@ -140,7 +196,7 @@ func (ss *StatService) processEvent(ev StatsEvent) (err error) { // V1ProcessEvent implements StatV1 method for processing an Event func (ss *StatService) V1ProcessEvent(ev StatsEvent, reply *string) (err error) { - if err = ss.processEvent(ev); err == nil { + if err = sS.processEvent(ev); err == nil { *reply = utils.OK } return @@ -148,10 +204,10 @@ func (ss *StatService) V1ProcessEvent(ev StatsEvent, reply *string) (err error) // V1GetQueueIDs returns list of queue IDs configured in the service func (ss *StatService) V1GetQueueIDs(ignored struct{}, reply *[]string) (err error) { - if len(ss.queuesCache) == 0 { + if len(sS.queuesCache) == 0 { return utils.ErrNotFound } - for k := range ss.queuesCache { + for k := range sS.queuesCache { *reply = append(*reply, k) } return @@ -159,7 +215,7 @@ func (ss *StatService) V1GetQueueIDs(ignored struct{}, reply *[]string) (err err // V1GetStringMetrics returns the metrics as string values func (ss *StatService) V1GetStringMetrics(queueID string, reply *map[string]string) (err error) { - sq, has := ss.queuesCache[queueID] + sq, has := sS.queuesCache[queueID] if !has { return utils.ErrNotFound } @@ -173,7 +229,7 @@ func (ss *StatService) V1GetStringMetrics(queueID string, reply *map[string]stri // V1GetFloatMetrics returns the metrics as float64 values func (ss *StatService) V1GetFloatMetrics(queueID string, reply *map[string]float64) (err error) { - sq, has := ss.queuesCache[queueID] + sq, has := sS.queuesCache[queueID] if !has { return utils.ErrNotFound } @@ -195,7 +251,7 @@ type ArgsLoadQueues struct { func (ss *StatService) V1LoadQueues(args ArgsLoadQueues, reply *string) (err error) { qIDs := args.QueueIDs if qIDs == nil { - sqPrfxs, err := ss.dataDB.GetKeysForPrefix(utils.StatsConfigPrefix) + sqPrfxs, err := sS.dataDB.GetKeysForPrefix(utils.StatsConfigPrefix) if err != nil { return err } @@ -212,10 +268,10 @@ func (ss *StatService) V1LoadQueues(args ArgsLoadQueues, reply *string) (err err } var sQs []*StatQueue // cache here so we lock only later when data available for _, qID := range *qIDs { - if _, hasPrev := ss.queuesCache[qID]; hasPrev { + if _, hasPrev := sS.queuesCache[qID]; hasPrev { continue // don't overwrite previous, could be extended in the future by carefully checking cached events } - if q, err := ss.loadQueue(qID); err != nil { + if q, err := sS.loadQueue(qID); err != nil { utils.Logger.Err(fmt.Sprintf(" failed loading quueue with id: <%s>, err: <%s>", q.cfg.ID, err.Error())) continue @@ -223,12 +279,12 @@ func (ss *StatService) V1LoadQueues(args ArgsLoadQueues, reply *string) (err err sQs = append(sQs, q) } } - ss.Lock() + sS.Lock() for _, q := range sQs { - ss.setQueue(q) + sS.setQueue(q) } - ss.queues.Sort() - ss.Unlock() + sS.queues.Sort() + sS.Unlock() *reply = utils.OK return }