/* Real-time Online/Offline Charging System (OCS) for Telecom & ISP environments Copyright (C) ITsysCOM GmbH This program is free software: you can redistribute it and/or modify it under the terms of the GNU General Public License as published by the Free Software Foundation, either version 3 of the License, or (at your option) any later version. This program is distributed in the hope that it will be useful, but WITHOUT ANY WARRANTY; without even the implied warranty of MERCHANTABILITY or FITNESS FOR A PARTICULAR PURPOSE. See the GNU General Public License for more details. You should have received a copy of the GNU General Public License along with this program. If not, see */ package stats import ( "fmt" "sort" "sync" "time" "github.com/cgrates/cgrates/engine" "github.com/cgrates/cgrates/utils" ) // StatsInstances is a sortable list of StatsInstance type StatsInstances []*StatsInstance // Sort is part of sort interface, sort based on Weight func (sis StatsInstances) Sort() { sort.Slice(sis, func(i, j int) bool { return sis[i].cfg.Weight > sis[j].cfg.Weight }) } // remWithID removes the queue with ID from slice func (sis StatsInstances) remWithID(qID string) { for i, q := range sis { if q.cfg.ID == qID { copy(sis[i:], sis[i+1:]) sis[len(sis)-1] = nil sis = sis[:len(sis)-1] break // there can be only one item with ID } } } // NewStatsInstance instantiates a StatsInstance func NewStatsInstance(sec *StatsEventCache, ms engine.Marshaler, sqCfg *engine.StatsQueue, sqSM *engine.SQStoredMetrics) (si *StatsInstance, err error) { si = &StatsInstance{sec: sec, ms: ms, cfg: sqCfg, sqMetrics: make(map[string]StatsMetric)} for _, metricID := range sqCfg.Metrics { if si.sqMetrics[metricID], err = NewStatsMetric(metricID); err != nil { return } } if sqSM != nil { for evID, ev := range sqSM.SEvents { si.sec.Cache(evID, ev, si.cfg.ID) } si.sqItems = sqSM.SQItems for metricID := range si.sqMetrics { if _, has := si.sqMetrics[metricID]; !has { if si.sqMetrics[metricID], err = NewStatsMetric(metricID); err != nil { return } } if stored, has := sqSM.SQMetrics[metricID]; !has { continue } else if err = si.sqMetrics[metricID].SetFromMarshaled(stored, ms); err != nil { return } } } return } // StatsInstance represents an individual stats instance type StatsInstance struct { sync.RWMutex dirty bool // needs save sec *StatsEventCache sqItems []*engine.SQItem sqMetrics map[string]StatsMetric ms engine.Marshaler // used to get/set Metrics cfg *engine.StatsQueue } // GetSQStoredMetrics retrieves the data used for store to DB func (sq *StatsInstance) GetStoredMetrics() (sqSM *engine.SQStoredMetrics) { sq.RLock() defer sq.RUnlock() sEvents := make(map[string]engine.StatsEvent) var sItems []*engine.SQItem for _, sqItem := range sq.sqItems { // make sure event is properly retrieved from cache ev := sq.sec.GetEvent(sqItem.EventID) if ev == nil { utils.Logger.Warning(fmt.Sprintf(" querying for storage eventID: %s, error: event not cached", sqItem.EventID)) continue } sEvents[sqItem.EventID] = ev sItems = append(sItems, sqItem) } sqSM = &engine.SQStoredMetrics{ SEvents: sEvents, SQItems: sItems, SQMetrics: make(map[string][]byte, len(sq.sqMetrics))} for metricID, metric := range sq.sqMetrics { var err error if sqSM.SQMetrics[metricID], err = metric.GetMarshaled(sq.ms); err != nil { utils.Logger.Warning(fmt.Sprintf(" querying for storage metricID: %s, error: %s", metricID, err.Error())) continue } } return } // ProcessEvent processes a StatsEvent, returns true if processed func (sq *StatsInstance) ProcessEvent(ev engine.StatsEvent) (err error) { sq.Lock() sq.remExpired() sq.remOnQueueLength() sq.addStatsEvent(ev) sq.Unlock() return } // remExpired expires items in queue func (sq *StatsInstance) remExpired() { var expIdx *int // index of last item to be expired for i, item := range sq.sqItems { if item.ExpiryTime == nil { break } if item.ExpiryTime.After(time.Now()) { break } sq.remEventWithID(item.EventID) item = nil // garbage collected asap expIdx = &i } if expIdx == nil { return } nextValidIdx := *expIdx + 1 sq.sqItems = sq.sqItems[nextValidIdx:] } // remOnQueueLength rems elements based on QueueLength setting func (sq *StatsInstance) remOnQueueLength() { if sq.cfg.QueueLength == 0 { return } if len(sq.sqItems) == sq.cfg.QueueLength { // reached limit, rem first element itm := sq.sqItems[0] sq.remEventWithID(itm.EventID) itm = nil sq.sqItems = sq.sqItems[1:] } } // addStatsEvent computes metrics for an event func (sq *StatsInstance) addStatsEvent(ev engine.StatsEvent) { evID := ev.ID() for metricID, metric := range sq.sqMetrics { if err := metric.AddEvent(ev); err != nil { utils.Logger.Warning(fmt.Sprintf(" metricID: %s, add eventID: %s, error: %s", metricID, evID, err.Error())) } } } // remStatsEvent removes an event from metrics func (sq *StatsInstance) remEventWithID(evID string) { ev := sq.sec.GetEvent(evID) if ev == nil { utils.Logger.Warning(fmt.Sprintf(" removing eventID: %s, error: event not cached", evID)) return } for metricID, metric := range sq.sqMetrics { if err := metric.RemEvent(ev); err != nil { utils.Logger.Warning(fmt.Sprintf(" metricID: %s, remove eventID: %s, error: %s", metricID, evID, err.Error())) } } }