diff --git a/data/tariffplans/tutorial/Stats.csv b/data/tariffplans/tutorial/Stats.csv
index 938885b88..723700cc4 100644
--- a/data/tariffplans/tutorial/Stats.csv
+++ b/data/tariffplans/tutorial/Stats.csv
@@ -1,2 +1,2 @@
-#Id,ActivationInterval,FilterType,FilterFieldName,FilterFieldValues,QueueLength,TTL,Metrics,Thresholds,Weight
-Stats1,2014-07-29T15:00:00Z,*string,Account,1001;1002,100,1s,*asr;*acd;*acc,THRESH1;THRESH2,20
+#Id,ActivationInterval,FilterType,FilterFieldName,FilterFieldValues,QueueLength,TTL,Metrics,Store,Thresholds,Weight
+Stats1,2014-07-29T15:00:00Z,*string,Account,1001;1002,100,1s,*asr;*acd;*acc,true,THRESH1;THRESH2,20
diff --git a/engine/stats_queue.go b/engine/cdrstats_queue.go
similarity index 100%
rename from engine/stats_queue.go
rename to engine/cdrstats_queue.go
diff --git a/engine/stats_test.go b/engine/cdrstats_test.go
similarity index 100%
rename from engine/stats_test.go
rename to engine/cdrstats_test.go
diff --git a/engine/statservice.go b/engine/statservice.go
new file mode 100644
index 000000000..367d43fcb
--- /dev/null
+++ b/engine/statservice.go
@@ -0,0 +1,64 @@
+/*
+Real-time Online/Offline Charging System (OCS) for Telecom & ISP environments
+Copyright (C) ITsysCOM GmbH
+
+This program is free software: you can redistribute it and/or modify
+it under the terms of the GNU General Public License as published by
+the Free Software Foundation, either version 3 of the License, or
+(at your option) any later version.
+
+This program is distributed in the hope that it will be useful,
+but WITHOUT ANY WARRANTY; without even the implied warranty of
+MERCHANTABILITY or FITNESS FOR A PARTICULAR PURPOSE. See the
+GNU General Public License for more details.
+
+You should have received a copy of the GNU General Public License
+along with this program. If not, see
+*/
+package engine
+
+import (
+ "errors"
+ "fmt"
+ "sync"
+
+ "github.com/cgrates/cgrates/utils"
+)
+
+// StatService builds stats for events
+type StatService struct {
+ sync.RWMutex
+ dataDB DataDB
+ stQueues []*StatsQueue // ordered list of StatsQueues
+ evCache *StatsEventCache // so we can pass it to queues
+}
+
+// Called to start the service
+func (ss *StatService) ListenAndServe() error {
+ return nil
+}
+
+// Called to shutdown the service
+func (ss *StatService) ServiceShutdown() error {
+ return nil
+}
+
+// processEvent processes a StatsEvent through the queues and caches it when needed
+func (ss *StatService) processEvent(ev StatsEvent) (err error) {
+ evStatsID := ev.ID()
+ if evStatsID == "" { // ID is mandatory
+ return errors.New("missing ID field")
+ }
+ for _, stQ := range ss.stQueues {
+ if err := stQ.ProcessEvent(ev); err != nil {
+ utils.Logger.Warning(fmt.Sprintf(" QueueID: %s, ignoring event with ID: %s, error: %s",
+ stQ.ID, evStatsID, err.Error()))
+ }
+ }
+ return
+}
+
+// store stores the necessary data to DB
+func (ss *StatService) store() (err error) {
+ return
+}
diff --git a/engine/stats.go b/engine/statsevent.go
similarity index 71%
rename from engine/stats.go
rename to engine/statsevent.go
index de87c1b48..705ba0b3a 100644
--- a/engine/stats.go
+++ b/engine/statsevent.go
@@ -17,19 +17,16 @@ along with this program. If not, see
*/
package engine
-type EventStatsQueue struct{}
+import (
+ "github.com/cgrates/cgrates/utils"
+)
-type StatService struct {
- dataDB DataDB
- stQueues []*EventStatsQueue
-}
+// StatsEvent is an event received by StatService
+type StatsEvent map[string]interface{}
-// Called to start the service
-func (ss *StatService) ListenAndServe() error {
- return nil
-}
-
-// Called to shutdown the service
-func (ss *StatService) ServiceShutdown() error {
- return nil
+func (se StatsEvent) ID() (id string) {
+ if sID, has := se[utils.ID]; has {
+ id = sID.(string)
+ }
+ return
}
diff --git a/engine/statseventcache.go b/engine/statseventcache.go
new file mode 100644
index 000000000..7fb1131fd
--- /dev/null
+++ b/engine/statseventcache.go
@@ -0,0 +1,71 @@
+/*
+Real-time Online/Offline Charging System (OCS) for Telecom & ISP environments
+Copyright (C) ITsysCOM GmbH
+
+This program is free software: you can redistribute it and/or modify
+it under the terms of the GNU General Public License as published by
+the Free Software Foundation, either version 3 of the License, or
+(at your option) any later version.
+
+This program is distributed in the hope that it will be useful,
+but WITHOUT ANY WARRANTY; without even the implied warranty of
+MERCHANTABILITY or FITNESS FOR A PARTICULAR PURPOSE. See the
+GNU General Public License for more details.
+
+You should have received a copy of the GNU General Public License
+along with this program. If not, see
+*/
+package engine
+
+import (
+ "sync"
+
+ "github.com/cgrates/cgrates/utils"
+)
+
+// NewStatsEventCache instantiates a StatsEventCache
+func NewStatsEventCache() *StatsEventCache {
+ return &StatsEventCache{
+ evCacheIdx: make(map[string]utils.StringMap),
+ evCache: make(map[string]StatsEvent)}
+}
+
+// StatsEventCache keeps a cache of StatsEvents which are referenced by StatsQueues
+type StatsEventCache struct {
+ sync.RWMutex
+ evCacheIdx map[string]utils.StringMap // index events used in queues, map[eventID]map[queueID]bool
+ evCache map[string]StatsEvent // cache for the processed events
+}
+
+// Cache will cache an event and reference it in the index
+func (sec *StatsEventCache) Cache(evID string, ev StatsEvent, queueID string) {
+ if utils.IsSliceMember([]string{evID, queueID}, "") {
+ return
+ }
+ sec.Lock()
+ if _, hasIt := sec.evCache[evID]; !hasIt {
+ sec.evCache[evID] = ev
+ }
+ sec.evCacheIdx[evID][queueID] = true
+ sec.Unlock()
+}
+
+func (sec *StatsEventCache) UnCache(evID string, ev StatsEvent, queueID string) {
+ sec.Lock()
+ if _, hasIt := sec.evCache[evID]; !hasIt {
+ return
+ }
+ delete(sec.evCacheIdx[evID], queueID)
+ if len(sec.evCacheIdx[evID]) == 0 {
+ delete(sec.evCacheIdx, evID)
+ delete(sec.evCache, evID)
+ }
+ sec.Unlock()
+}
+
+// GetEvent returns the event based on ID
+func (sec *StatsEventCache) GetEvent(evID string) StatsEvent {
+ sec.RLock()
+ defer sec.RUnlock()
+ return sec.evCache[evID]
+}
diff --git a/engine/statsmetrics.go b/engine/statsmetrics.go
new file mode 100644
index 000000000..e478557d2
--- /dev/null
+++ b/engine/statsmetrics.go
@@ -0,0 +1,103 @@
+/*
+Real-time Online/Offline Charging System (OCS) for Telecom & ISP environments
+Copyright (C) ITsysCOM GmbH
+
+This program is free software: you can redistribute it and/or modify
+it under the terms of the GNU General Public License as published by
+the Free Software Foundation, either version 3 of the License, or
+(at your option) any later version.
+
+This program is distributed in the hope that it will be useful,
+but WITHOUT ANY WARRANTY; without even the implied warranty of
+MERCHANTABILITY or FITNESS FOR A PARTICULAR PURPOSE. See the
+GNU General Public License for more details.
+
+You should have received a copy of the GNU General Public License
+along with this program. If not, see
+*/
+
+package engine
+
+import (
+ "fmt"
+
+ "github.com/cgrates/cgrates/utils"
+)
+
+// NewStatsMetrics instantiates the StatsMetrics
+func NewStatsMetric(metricID string) (sm StatsMetric, err error) {
+ metrics := map[string]func() (StatsMetric, error){
+ utils.MetaASR: NewStatsASR,
+ utils.MetaACD: NewStatsACD,
+ }
+ if _, has := metrics[metricID]; !has {
+ return nil, fmt.Errorf("unsupported metric: %s", metricID)
+ }
+ return metrics[metricID]()
+}
+
+// StatsMetric is the interface which a metric should implement
+type StatsMetric interface {
+ getStringValue() string
+ addEvent(ev StatsEvent) error
+ remEvent(ev StatsEvent) error
+ getStoredValues() ([]byte, error) // used to generate the values which are stored into DB
+ loadStoredValues([]byte) error // load the values from DB
+}
+
+func NewStatsASR() (StatsMetric, error) {
+ return new(StatsASR), nil
+}
+
+// StatsASR implements AverageSuccessRatio metric
+type StatsASR struct {
+ answered int
+ count int
+}
+
+func (asr *StatsASR) getStringValue() (val string) {
+ return
+}
+
+func (asr *StatsASR) addEvent(ev StatsEvent) (err error) {
+ return
+}
+
+func (asr *StatsASR) remEvent(ev StatsEvent) (err error) {
+ return
+}
+
+func (asr *StatsASR) getStoredValues() (vals []byte, err error) {
+ return
+}
+
+func (asr *StatsASR) loadStoredValues(vals []byte) (err error) {
+ return
+}
+
+func NewStatsACD() (StatsMetric, error) {
+ return new(StatsACD), nil
+}
+
+// StatsACD implements AverageCallDuration metric
+type StatsACD struct{}
+
+func (acd *StatsACD) getStringValue() (val string) {
+ return
+}
+
+func (acd *StatsACD) addEvent(ev StatsEvent) (err error) {
+ return
+}
+
+func (acd *StatsACD) remEvent(ev StatsEvent) (err error) {
+ return
+}
+
+func (asr *StatsACD) getStoredValues() (vals []byte, err error) {
+ return
+}
+
+func (asr *StatsACD) loadStoredValues(vals []byte) (err error) {
+ return
+}
diff --git a/engine/statsqueue.go b/engine/statsqueue.go
new file mode 100644
index 000000000..28e25269e
--- /dev/null
+++ b/engine/statsqueue.go
@@ -0,0 +1,165 @@
+/*
+Real-time Online/Offline Charging System (OCS) for Telecom & ISP environments
+Copyright (C) ITsysCOM GmbH
+
+This program is free software: you can redistribute it and/or modify
+it under the terms of the GNU General Public License as published by
+the Free Software Foundation, either version 3 of the License, or
+(at your option) any later version.
+
+This program is distributed in the hope that it will be useful,
+but WITHOUT ANY WARRANTY; without even the implied warranty of
+MERCHANTABILITY or FITNESS FOR A PARTICULAR PURPOSE. See the
+GNU General Public License for more details.
+
+You should have received a copy of the GNU General Public License
+along with this program. If not, see
+*/
+package engine
+
+import (
+ "errors"
+ "fmt"
+ "sync"
+ "time"
+
+ "github.com/cgrates/cgrates/utils"
+)
+
+// SQItem represents one item in the stats queue
+type SQItem struct {
+ EventID string // Bounded to the original StatsEvent
+ ExpiryTime *time.Time // Used to auto-expire events
+}
+
+// SQStored contains values saved in DB on store
+type StoredSQ struct {
+ SQItems []*SQItem
+ SQMetrics map[string][]byte
+}
+
+// StatsQueue represents an individual stats instance
+type StatsQueue struct {
+ sync.RWMutex
+ dirty bool // needs save
+ sec *StatsEventCache
+ sqItems []*SQItem
+ sqMetrics map[string]StatsMetric
+
+ ID string // QueueID
+ ActivationInterval *utils.ActivationInterval // Activation interval
+ Filters []*RequestFilter
+ QueueLength int
+ TTL time.Duration
+ Metrics []string // list of metrics to build
+ Store bool // store to DB
+ Thresholds []string // list of thresholds to be checked after changes
+}
+
+// Init prepares a StatsQueue for operations
+// Should be executed at server start
+func (sq *StatsQueue) Init(sec *StatsEventCache, storedSQ *StoredSQ) (err error) {
+ sq.sec = sec
+ if storedSQ == nil {
+ return
+ }
+ sq.sqItems = storedSQ.SQItems
+ for metricID := range sq.sqMetrics {
+ if sq.sqMetrics[metricID], err = NewStatsMetric(metricID); err != nil {
+ return
+ }
+ if stored, has := storedSQ.SQMetrics[metricID]; !has {
+ continue
+ } else if err = sq.sqMetrics[metricID].loadStoredValues(stored); err != nil {
+ return
+ }
+ }
+ return
+}
+
+// GetStoredSQ retrieves the data used for store to DB
+func (sq *StatsQueue) GetStoredSQ() (sSQ *StoredSQ, err error) {
+ sq.RLock()
+ defer sq.RUnlock()
+ if !sq.Store {
+ return nil, errors.New("not storable")
+ }
+ sSQ = &StoredSQ{
+ SQItems: sq.sqItems,
+ SQMetrics: make(map[string][]byte, len(sq.sqMetrics))}
+ for metricID, metric := range sq.sqMetrics {
+ if sSQ.SQMetrics[metricID], err = metric.getStoredValues(); err != nil {
+ return nil, err
+ }
+ }
+ return
+}
+
+// remExpired expires items in queue
+func (sq *StatsQueue) remExpired() {
+ var expIdx *int // index of last item to be expired
+ for i, item := range sq.sqItems {
+ if item.ExpiryTime == nil {
+ break
+ }
+ if item.ExpiryTime.After(time.Now()) {
+ break
+ }
+ sq.remEventWithID(item.EventID)
+ item = nil // garbage collected asap
+ expIdx = &i
+ }
+ if expIdx == nil {
+ return
+ }
+ nextValidIdx := *expIdx + 1
+ sq.sqItems = sq.sqItems[nextValidIdx:]
+}
+
+// remOnQueueLength rems elements based on QueueLength setting
+func (sq *StatsQueue) remOnQueueLength() {
+ if sq.QueueLength == 0 {
+ return
+ }
+ if len(sq.sqItems) == sq.QueueLength { // reached limit, rem first element
+ itm := sq.sqItems[0]
+ sq.remEventWithID(itm.EventID)
+ itm = nil
+ sq.sqItems = sq.sqItems[1:]
+ }
+}
+
+// addStatsEvent computes metrics for an event
+func (sq *StatsQueue) addStatsEvent(ev StatsEvent) {
+ evID := ev.ID()
+ for metricID, metric := range sq.sqMetrics {
+ if err := metric.addEvent(ev); err != nil {
+ utils.Logger.Warning(fmt.Sprintf(" metricID: %s, add eventID: %s, error: %s",
+ metricID, evID, err.Error()))
+ }
+ }
+}
+
+// remStatsEvent removes an event from metrics
+func (sq *StatsQueue) remEventWithID(evID string) {
+ ev := sq.sec.GetEvent(evID)
+ if ev == nil {
+ utils.Logger.Warning(fmt.Sprintf(" removing eventID: %s, error: event not cached", evID))
+ return
+ }
+ for metricID, metric := range sq.sqMetrics {
+ if err := metric.remEvent(ev); err != nil {
+ utils.Logger.Warning(fmt.Sprintf(" metricID: %s, remove eventID: %s, error: %s", metricID, evID, err.Error()))
+ }
+ }
+}
+
+// ProcessEvent processes a StatsEvent, returns true if processed
+func (sq *StatsQueue) ProcessEvent(ev StatsEvent) (err error) {
+ sq.Lock()
+ sq.remExpired()
+ sq.remOnQueueLength()
+ sq.addStatsEvent(ev)
+ sq.Unlock()
+ return
+}
diff --git a/utils/consts.go b/utils/consts.go
index a1fcce6bb..978a20aff 100644
--- a/utils/consts.go
+++ b/utils/consts.go
@@ -373,4 +373,7 @@ const (
Accounts = "Accounts"
MetaEveryMinute = "*every_minute"
MetaHourly = "*hourly"
+ ID = "ID"
+ MetaASR = "*asr"
+ MetaACD = "*acd"
)