From 7c11ebde714a857710b76f4a8ad8a54b0ad7b339 Mon Sep 17 00:00:00 2001 From: DanB Date: Sun, 9 Jul 2017 15:27:29 +0200 Subject: [PATCH] StatsService basic functions --- data/tariffplans/tutorial/Stats.csv | 4 +- engine/{stats_queue.go => cdrstats_queue.go} | 0 engine/{stats_test.go => cdrstats_test.go} | 0 engine/statservice.go | 64 +++++++ engine/{stats.go => statsevent.go} | 23 ++- engine/statseventcache.go | 71 ++++++++ engine/statsmetrics.go | 103 ++++++++++++ engine/statsqueue.go | 165 +++++++++++++++++++ utils/consts.go | 3 + 9 files changed, 418 insertions(+), 15 deletions(-) rename engine/{stats_queue.go => cdrstats_queue.go} (100%) rename engine/{stats_test.go => cdrstats_test.go} (100%) create mode 100644 engine/statservice.go rename engine/{stats.go => statsevent.go} (71%) create mode 100644 engine/statseventcache.go create mode 100644 engine/statsmetrics.go create mode 100644 engine/statsqueue.go diff --git a/data/tariffplans/tutorial/Stats.csv b/data/tariffplans/tutorial/Stats.csv index 938885b88..723700cc4 100644 --- a/data/tariffplans/tutorial/Stats.csv +++ b/data/tariffplans/tutorial/Stats.csv @@ -1,2 +1,2 @@ -#Id,ActivationInterval,FilterType,FilterFieldName,FilterFieldValues,QueueLength,TTL,Metrics,Thresholds,Weight -Stats1,2014-07-29T15:00:00Z,*string,Account,1001;1002,100,1s,*asr;*acd;*acc,THRESH1;THRESH2,20 +#Id,ActivationInterval,FilterType,FilterFieldName,FilterFieldValues,QueueLength,TTL,Metrics,Store,Thresholds,Weight +Stats1,2014-07-29T15:00:00Z,*string,Account,1001;1002,100,1s,*asr;*acd;*acc,true,THRESH1;THRESH2,20 diff --git a/engine/stats_queue.go b/engine/cdrstats_queue.go similarity index 100% rename from engine/stats_queue.go rename to engine/cdrstats_queue.go diff --git a/engine/stats_test.go b/engine/cdrstats_test.go similarity index 100% rename from engine/stats_test.go rename to engine/cdrstats_test.go diff --git a/engine/statservice.go b/engine/statservice.go new file mode 100644 index 000000000..367d43fcb --- /dev/null +++ b/engine/statservice.go @@ -0,0 +1,64 @@ +/* +Real-time Online/Offline Charging System (OCS) for Telecom & ISP environments +Copyright (C) ITsysCOM GmbH + +This program is free software: you can redistribute it and/or modify +it under the terms of the GNU General Public License as published by +the Free Software Foundation, either version 3 of the License, or +(at your option) any later version. + +This program is distributed in the hope that it will be useful, +but WITHOUT ANY WARRANTY; without even the implied warranty of +MERCHANTABILITY or FITNESS FOR A PARTICULAR PURPOSE. See the +GNU General Public License for more details. + +You should have received a copy of the GNU General Public License +along with this program. If not, see +*/ +package engine + +import ( + "errors" + "fmt" + "sync" + + "github.com/cgrates/cgrates/utils" +) + +// StatService builds stats for events +type StatService struct { + sync.RWMutex + dataDB DataDB + stQueues []*StatsQueue // ordered list of StatsQueues + evCache *StatsEventCache // so we can pass it to queues +} + +// Called to start the service +func (ss *StatService) ListenAndServe() error { + return nil +} + +// Called to shutdown the service +func (ss *StatService) ServiceShutdown() error { + return nil +} + +// processEvent processes a StatsEvent through the queues and caches it when needed +func (ss *StatService) processEvent(ev StatsEvent) (err error) { + evStatsID := ev.ID() + if evStatsID == "" { // ID is mandatory + return errors.New("missing ID field") + } + for _, stQ := range ss.stQueues { + if err := stQ.ProcessEvent(ev); err != nil { + utils.Logger.Warning(fmt.Sprintf(" QueueID: %s, ignoring event with ID: %s, error: %s", + stQ.ID, evStatsID, err.Error())) + } + } + return +} + +// store stores the necessary data to DB +func (ss *StatService) store() (err error) { + return +} diff --git a/engine/stats.go b/engine/statsevent.go similarity index 71% rename from engine/stats.go rename to engine/statsevent.go index de87c1b48..705ba0b3a 100644 --- a/engine/stats.go +++ b/engine/statsevent.go @@ -17,19 +17,16 @@ along with this program. If not, see */ package engine -type EventStatsQueue struct{} +import ( + "github.com/cgrates/cgrates/utils" +) -type StatService struct { - dataDB DataDB - stQueues []*EventStatsQueue -} +// StatsEvent is an event received by StatService +type StatsEvent map[string]interface{} -// Called to start the service -func (ss *StatService) ListenAndServe() error { - return nil -} - -// Called to shutdown the service -func (ss *StatService) ServiceShutdown() error { - return nil +func (se StatsEvent) ID() (id string) { + if sID, has := se[utils.ID]; has { + id = sID.(string) + } + return } diff --git a/engine/statseventcache.go b/engine/statseventcache.go new file mode 100644 index 000000000..7fb1131fd --- /dev/null +++ b/engine/statseventcache.go @@ -0,0 +1,71 @@ +/* +Real-time Online/Offline Charging System (OCS) for Telecom & ISP environments +Copyright (C) ITsysCOM GmbH + +This program is free software: you can redistribute it and/or modify +it under the terms of the GNU General Public License as published by +the Free Software Foundation, either version 3 of the License, or +(at your option) any later version. + +This program is distributed in the hope that it will be useful, +but WITHOUT ANY WARRANTY; without even the implied warranty of +MERCHANTABILITY or FITNESS FOR A PARTICULAR PURPOSE. See the +GNU General Public License for more details. + +You should have received a copy of the GNU General Public License +along with this program. If not, see +*/ +package engine + +import ( + "sync" + + "github.com/cgrates/cgrates/utils" +) + +// NewStatsEventCache instantiates a StatsEventCache +func NewStatsEventCache() *StatsEventCache { + return &StatsEventCache{ + evCacheIdx: make(map[string]utils.StringMap), + evCache: make(map[string]StatsEvent)} +} + +// StatsEventCache keeps a cache of StatsEvents which are referenced by StatsQueues +type StatsEventCache struct { + sync.RWMutex + evCacheIdx map[string]utils.StringMap // index events used in queues, map[eventID]map[queueID]bool + evCache map[string]StatsEvent // cache for the processed events +} + +// Cache will cache an event and reference it in the index +func (sec *StatsEventCache) Cache(evID string, ev StatsEvent, queueID string) { + if utils.IsSliceMember([]string{evID, queueID}, "") { + return + } + sec.Lock() + if _, hasIt := sec.evCache[evID]; !hasIt { + sec.evCache[evID] = ev + } + sec.evCacheIdx[evID][queueID] = true + sec.Unlock() +} + +func (sec *StatsEventCache) UnCache(evID string, ev StatsEvent, queueID string) { + sec.Lock() + if _, hasIt := sec.evCache[evID]; !hasIt { + return + } + delete(sec.evCacheIdx[evID], queueID) + if len(sec.evCacheIdx[evID]) == 0 { + delete(sec.evCacheIdx, evID) + delete(sec.evCache, evID) + } + sec.Unlock() +} + +// GetEvent returns the event based on ID +func (sec *StatsEventCache) GetEvent(evID string) StatsEvent { + sec.RLock() + defer sec.RUnlock() + return sec.evCache[evID] +} diff --git a/engine/statsmetrics.go b/engine/statsmetrics.go new file mode 100644 index 000000000..e478557d2 --- /dev/null +++ b/engine/statsmetrics.go @@ -0,0 +1,103 @@ +/* +Real-time Online/Offline Charging System (OCS) for Telecom & ISP environments +Copyright (C) ITsysCOM GmbH + +This program is free software: you can redistribute it and/or modify +it under the terms of the GNU General Public License as published by +the Free Software Foundation, either version 3 of the License, or +(at your option) any later version. + +This program is distributed in the hope that it will be useful, +but WITHOUT ANY WARRANTY; without even the implied warranty of +MERCHANTABILITY or FITNESS FOR A PARTICULAR PURPOSE. See the +GNU General Public License for more details. + +You should have received a copy of the GNU General Public License +along with this program. If not, see +*/ + +package engine + +import ( + "fmt" + + "github.com/cgrates/cgrates/utils" +) + +// NewStatsMetrics instantiates the StatsMetrics +func NewStatsMetric(metricID string) (sm StatsMetric, err error) { + metrics := map[string]func() (StatsMetric, error){ + utils.MetaASR: NewStatsASR, + utils.MetaACD: NewStatsACD, + } + if _, has := metrics[metricID]; !has { + return nil, fmt.Errorf("unsupported metric: %s", metricID) + } + return metrics[metricID]() +} + +// StatsMetric is the interface which a metric should implement +type StatsMetric interface { + getStringValue() string + addEvent(ev StatsEvent) error + remEvent(ev StatsEvent) error + getStoredValues() ([]byte, error) // used to generate the values which are stored into DB + loadStoredValues([]byte) error // load the values from DB +} + +func NewStatsASR() (StatsMetric, error) { + return new(StatsASR), nil +} + +// StatsASR implements AverageSuccessRatio metric +type StatsASR struct { + answered int + count int +} + +func (asr *StatsASR) getStringValue() (val string) { + return +} + +func (asr *StatsASR) addEvent(ev StatsEvent) (err error) { + return +} + +func (asr *StatsASR) remEvent(ev StatsEvent) (err error) { + return +} + +func (asr *StatsASR) getStoredValues() (vals []byte, err error) { + return +} + +func (asr *StatsASR) loadStoredValues(vals []byte) (err error) { + return +} + +func NewStatsACD() (StatsMetric, error) { + return new(StatsACD), nil +} + +// StatsACD implements AverageCallDuration metric +type StatsACD struct{} + +func (acd *StatsACD) getStringValue() (val string) { + return +} + +func (acd *StatsACD) addEvent(ev StatsEvent) (err error) { + return +} + +func (acd *StatsACD) remEvent(ev StatsEvent) (err error) { + return +} + +func (asr *StatsACD) getStoredValues() (vals []byte, err error) { + return +} + +func (asr *StatsACD) loadStoredValues(vals []byte) (err error) { + return +} diff --git a/engine/statsqueue.go b/engine/statsqueue.go new file mode 100644 index 000000000..28e25269e --- /dev/null +++ b/engine/statsqueue.go @@ -0,0 +1,165 @@ +/* +Real-time Online/Offline Charging System (OCS) for Telecom & ISP environments +Copyright (C) ITsysCOM GmbH + +This program is free software: you can redistribute it and/or modify +it under the terms of the GNU General Public License as published by +the Free Software Foundation, either version 3 of the License, or +(at your option) any later version. + +This program is distributed in the hope that it will be useful, +but WITHOUT ANY WARRANTY; without even the implied warranty of +MERCHANTABILITY or FITNESS FOR A PARTICULAR PURPOSE. See the +GNU General Public License for more details. + +You should have received a copy of the GNU General Public License +along with this program. If not, see +*/ +package engine + +import ( + "errors" + "fmt" + "sync" + "time" + + "github.com/cgrates/cgrates/utils" +) + +// SQItem represents one item in the stats queue +type SQItem struct { + EventID string // Bounded to the original StatsEvent + ExpiryTime *time.Time // Used to auto-expire events +} + +// SQStored contains values saved in DB on store +type StoredSQ struct { + SQItems []*SQItem + SQMetrics map[string][]byte +} + +// StatsQueue represents an individual stats instance +type StatsQueue struct { + sync.RWMutex + dirty bool // needs save + sec *StatsEventCache + sqItems []*SQItem + sqMetrics map[string]StatsMetric + + ID string // QueueID + ActivationInterval *utils.ActivationInterval // Activation interval + Filters []*RequestFilter + QueueLength int + TTL time.Duration + Metrics []string // list of metrics to build + Store bool // store to DB + Thresholds []string // list of thresholds to be checked after changes +} + +// Init prepares a StatsQueue for operations +// Should be executed at server start +func (sq *StatsQueue) Init(sec *StatsEventCache, storedSQ *StoredSQ) (err error) { + sq.sec = sec + if storedSQ == nil { + return + } + sq.sqItems = storedSQ.SQItems + for metricID := range sq.sqMetrics { + if sq.sqMetrics[metricID], err = NewStatsMetric(metricID); err != nil { + return + } + if stored, has := storedSQ.SQMetrics[metricID]; !has { + continue + } else if err = sq.sqMetrics[metricID].loadStoredValues(stored); err != nil { + return + } + } + return +} + +// GetStoredSQ retrieves the data used for store to DB +func (sq *StatsQueue) GetStoredSQ() (sSQ *StoredSQ, err error) { + sq.RLock() + defer sq.RUnlock() + if !sq.Store { + return nil, errors.New("not storable") + } + sSQ = &StoredSQ{ + SQItems: sq.sqItems, + SQMetrics: make(map[string][]byte, len(sq.sqMetrics))} + for metricID, metric := range sq.sqMetrics { + if sSQ.SQMetrics[metricID], err = metric.getStoredValues(); err != nil { + return nil, err + } + } + return +} + +// remExpired expires items in queue +func (sq *StatsQueue) remExpired() { + var expIdx *int // index of last item to be expired + for i, item := range sq.sqItems { + if item.ExpiryTime == nil { + break + } + if item.ExpiryTime.After(time.Now()) { + break + } + sq.remEventWithID(item.EventID) + item = nil // garbage collected asap + expIdx = &i + } + if expIdx == nil { + return + } + nextValidIdx := *expIdx + 1 + sq.sqItems = sq.sqItems[nextValidIdx:] +} + +// remOnQueueLength rems elements based on QueueLength setting +func (sq *StatsQueue) remOnQueueLength() { + if sq.QueueLength == 0 { + return + } + if len(sq.sqItems) == sq.QueueLength { // reached limit, rem first element + itm := sq.sqItems[0] + sq.remEventWithID(itm.EventID) + itm = nil + sq.sqItems = sq.sqItems[1:] + } +} + +// addStatsEvent computes metrics for an event +func (sq *StatsQueue) addStatsEvent(ev StatsEvent) { + evID := ev.ID() + for metricID, metric := range sq.sqMetrics { + if err := metric.addEvent(ev); err != nil { + utils.Logger.Warning(fmt.Sprintf(" metricID: %s, add eventID: %s, error: %s", + metricID, evID, err.Error())) + } + } +} + +// remStatsEvent removes an event from metrics +func (sq *StatsQueue) remEventWithID(evID string) { + ev := sq.sec.GetEvent(evID) + if ev == nil { + utils.Logger.Warning(fmt.Sprintf(" removing eventID: %s, error: event not cached", evID)) + return + } + for metricID, metric := range sq.sqMetrics { + if err := metric.remEvent(ev); err != nil { + utils.Logger.Warning(fmt.Sprintf(" metricID: %s, remove eventID: %s, error: %s", metricID, evID, err.Error())) + } + } +} + +// ProcessEvent processes a StatsEvent, returns true if processed +func (sq *StatsQueue) ProcessEvent(ev StatsEvent) (err error) { + sq.Lock() + sq.remExpired() + sq.remOnQueueLength() + sq.addStatsEvent(ev) + sq.Unlock() + return +} diff --git a/utils/consts.go b/utils/consts.go index a1fcce6bb..978a20aff 100644 --- a/utils/consts.go +++ b/utils/consts.go @@ -373,4 +373,7 @@ const ( Accounts = "Accounts" MetaEveryMinute = "*every_minute" MetaHourly = "*hourly" + ID = "ID" + MetaASR = "*asr" + MetaACD = "*acd" )