diff --git a/apier/v1/stats.go b/apier/v1/stats.go
index c3b90c148..09583f122 100644
--- a/apier/v1/stats.go
+++ b/apier/v1/stats.go
@@ -17,6 +17,7 @@ along with this program. If not, see
*/
package v1
+/*
import (
"reflect"
"strings"
@@ -136,3 +137,4 @@ func (apierV1 *ApierV1) RemStatQueueProfile(attrs AttrGetStatsCfg, reply *string
return nil
}
+*/
diff --git a/apier/v1/stats_it_test.go b/apier/v1/stats_it_test.go
index 92bb59c34..b32173f09 100644
--- a/apier/v1/stats_it_test.go
+++ b/apier/v1/stats_it_test.go
@@ -19,6 +19,7 @@ along with this program. If not, see
*/
package v1
+/*
import (
"math/rand"
"net/rpc"
@@ -346,3 +347,4 @@ func BenchmarkStatSV1GetStringMetrics(b *testing.B) {
}
}
}
+*/
diff --git a/cmd/cgr-engine/cgr-engine.go b/cmd/cgr-engine/cgr-engine.go
index 399ab8fd2..b8ca5a237 100644
--- a/cmd/cgr-engine/cgr-engine.go
+++ b/cmd/cgr-engine/cgr-engine.go
@@ -39,7 +39,6 @@ import (
"github.com/cgrates/cgrates/scheduler"
"github.com/cgrates/cgrates/servmanager"
"github.com/cgrates/cgrates/sessionmanager"
- "github.com/cgrates/cgrates/stats"
"github.com/cgrates/cgrates/utils"
"github.com/cgrates/rpcclient"
)
@@ -560,6 +559,7 @@ func startResourceService(internalRsChan, internalStatSConn chan rpcclient.RpcCl
internalRsChan <- rsV1
}
+/*
// startStatService fires up the StatS
func startStatService(internalStatSChan chan rpcclient.RpcClientConnection, cfg *config.CGRConfig,
dataDB engine.DataDB, ms engine.Marshaler, server *utils.Server, exitChan chan bool) {
@@ -582,6 +582,7 @@ func startStatService(internalStatSChan chan rpcclient.RpcClientConnection, cfg
server.RpcRegister(stsV1)
internalStatSChan <- stsV1
}
+*/
func startRpc(server *utils.Server, internalRaterChan,
internalCdrSChan, internalCdrStatSChan, internalHistorySChan, internalPubSubSChan, internalUserSChan,
@@ -696,17 +697,6 @@ func main() {
// Init cache
cache.NewCache(cfg.CacheConfig)
- var ms engine.Marshaler
- if ms, err = engine.NewMarshaler(cfg.DBDataEncoding); err != nil {
- log.Fatalf("error initializing marshaler: ", err)
- return
- }
- if err != nil {
- utils.Logger.Crit(fmt.Sprintf(" Could not start, error: %s", err.Error()))
- exitChan <- true
- return
- }
-
var dataDB engine.DataDB
var loadDb engine.LoadStorage
var cdrDb engine.CdrStorage
@@ -861,9 +851,9 @@ func main() {
internalStatSChan, cfg, dataDB, server, exitChan)
}
- if cfg.StatSCfg().Enabled {
- go startStatService(internalStatSChan, cfg, dataDB, ms, server, exitChan)
- }
+ //if cfg.StatSCfg().Enabled {
+ // go startStatService(internalStatSChan, cfg, dataDB, ms, server, exitChan)
+ //}
// Serve rpc connections
go startRpc(server, internalRaterChan, internalCdrSChan, internalCdrStatSChan, internalHistorySChan,
diff --git a/engine/libstats.go b/engine/libstats.go
index 539dbdf77..147b157e2 100755
--- a/engine/libstats.go
+++ b/engine/libstats.go
@@ -19,6 +19,7 @@ package engine
import (
"errors"
+ "sort"
"time"
"github.com/cgrates/cgrates/utils"
@@ -30,14 +31,6 @@ type SQItem struct {
ExpiryTime *time.Time // Used to auto-expire events
}
-// SQStoredMetrics contains metrics saved in DB
-type SQStoredMetrics struct {
- SqID string // StatsInstanceID
- SEvents map[string]StatsEvent // Events used by SQItems
- SQItems []*SQItem // SQItems
- SQMetrics map[string][]byte
-}
-
// StatsConfig represents the configuration of a StatsInstance in StatS
type StatQueueProfile struct {
ID string // QueueID
@@ -78,3 +71,123 @@ func (se StatsEvent) AnswerTime(timezone string) (at time.Time, err error) {
}
return utils.ParseTimeDetectLayout(atStr, timezone)
}
+
+// StatQueue represents an individual stats instance
+type StatQueue struct {
+ ID string
+ SQItems []*SQItem // SQItems
+ SQMetrics map[string]StatsMetric
+ sqItems []*SQItem
+ sqPrfl *StatQueueProfile
+ dirty *bool // needs save
+}
+
+/*
+// GetSQStoredMetrics retrieves the data used for store to DB
+func (sq *StatQueue) GetStoredMetrics() (sqSM *engine.SQStoredMetrics) {
+ sq.RLock()
+ defer sq.RUnlock()
+ sEvents := make(map[string]engine.StatsEvent)
+ var sItems []*engine.SQItem
+ for _, sqItem := range sq.sqItems { // make sure event is properly retrieved from cache
+ ev := sq.sec.GetEvent(sqItem.EventID)
+ if ev == nil {
+ utils.Logger.Warning(fmt.Sprintf(" querying for storage eventID: %s, error: event not cached",
+ sqItem.EventID))
+ continue
+ }
+ sEvents[sqItem.EventID] = ev
+ sItems = append(sItems, sqItem)
+ }
+ sqSM = &engine.SQStoredMetrics{
+ SEvents: sEvents,
+ SQItems: sItems,
+ SQMetrics: make(map[string][]byte, len(sq.sqMetrics))}
+ for metricID, metric := range sq.sqMetrics {
+ var err error
+ if sqSM.SQMetrics[metricID], err = metric.GetMarshaled(sq.ms); err != nil {
+ utils.Logger.Warning(fmt.Sprintf(" querying for storage metricID: %s, error: %s",
+ metricID, err.Error()))
+ continue
+ }
+ }
+ return
+}
+
+// ProcessEvent processes a StatsEvent, returns true if processed
+func (sq *StatQueue) ProcessEvent(ev engine.StatsEvent) (err error) {
+ sq.Lock()
+ sq.remExpired()
+ sq.remOnQueueLength()
+ sq.addStatsEvent(ev)
+ sq.Unlock()
+ return
+}
+
+// remExpired expires items in queue
+func (sq *StatQueue) remExpired() {
+ var expIdx *int // index of last item to be expired
+ for i, item := range sq.sqItems {
+ if item.ExpiryTime == nil {
+ break
+ }
+ if item.ExpiryTime.After(time.Now()) {
+ break
+ }
+ sq.remEventWithID(item.EventID)
+ item = nil // garbage collected asap
+ expIdx = &i
+ }
+ if expIdx == nil {
+ return
+ }
+ nextValidIdx := *expIdx + 1
+ sq.sqItems = sq.sqItems[nextValidIdx:]
+}
+
+// remOnQueueLength rems elements based on QueueLength setting
+func (sq *StatQueue) remOnQueueLength() {
+ if sq.cfg.QueueLength == 0 {
+ return
+ }
+ if len(sq.sqItems) == sq.cfg.QueueLength { // reached limit, rem first element
+ itm := sq.sqItems[0]
+ sq.remEventWithID(itm.EventID)
+ itm = nil
+ sq.sqItems = sq.sqItems[1:]
+ }
+}
+
+// addStatsEvent computes metrics for an event
+func (sq *StatQueue) addStatsEvent(ev engine.StatsEvent) {
+ evID := ev.ID()
+ for metricID, metric := range sq.sqMetrics {
+ if err := metric.AddEvent(ev); err != nil {
+ utils.Logger.Warning(fmt.Sprintf(" metricID: %s, add eventID: %s, error: %s",
+ metricID, evID, err.Error()))
+ }
+ }
+}
+
+// remStatsEvent removes an event from metrics
+func (sq *StatQueue) remEventWithID(evID string) {
+ ev := sq.sec.GetEvent(evID)
+ if ev == nil {
+ utils.Logger.Warning(fmt.Sprintf(" removing eventID: %s, error: event not cached", evID))
+ return
+ }
+ for metricID, metric := range sq.sqMetrics {
+ if err := metric.RemEvent(ev); err != nil {
+ utils.Logger.Warning(fmt.Sprintf(" metricID: %s, remove eventID: %s, error: %s", metricID, evID, err.Error()))
+ }
+ }
+}
+*/
+
+// StatQueues is a sortable list of StatQueue
+type StatQueues []*StatQueue
+
+// Sort is part of sort interface, sort based on Weight
+func (sis StatQueues) Sort() {
+ sort.Slice(sis, func(i, j int) bool { return sis[i].sqPrfl.Weight > sis[j].sqPrfl.Weight })
+}
diff --git a/stats/queue_test.go b/engine/libstats_test.go
similarity index 60%
rename from stats/queue_test.go
rename to engine/libstats_test.go
index dd98308ac..0e3230d7b 100644
--- a/stats/queue_test.go
+++ b/engine/libstats_test.go
@@ -15,28 +15,26 @@ GNU General Public License for more details.
You should have received a copy of the GNU General Public License
along with this program. If not, see
*/
-package stats
+package engine
import (
"reflect"
"testing"
-
- "github.com/cgrates/cgrates/engine"
)
func TestStatQueuesSort(t *testing.T) {
sInsts := StatQueues{
- &StatQueue{sqp: &engine.StatQueueProfile{ID: "FIRST", Weight: 30.0}},
- &StatQueue{sqp: &engine.StatQueueProfile{ID: "SECOND", Weight: 40.0}},
- &StatQueue{sqp: &engine.StatQueueProfile{ID: "THIRD", Weight: 30.0}},
- &StatQueue{sqp: &engine.StatQueueProfile{ID: "FOURTH", Weight: 35.0}},
+ &StatQueue{sqPrfl: &StatQueueProfile{ID: "FIRST", Weight: 30.0}},
+ &StatQueue{sqPrfl: &StatQueueProfile{ID: "SECOND", Weight: 40.0}},
+ &StatQueue{sqPrfl: &StatQueueProfile{ID: "THIRD", Weight: 30.0}},
+ &StatQueue{sqPrfl: &StatQueueProfile{ID: "FOURTH", Weight: 35.0}},
}
sInsts.Sort()
eSInst := StatQueues{
- &StatQueue{sqp: &engine.StatQueueProfile{ID: "SECOND", Weight: 40.0}},
- &StatQueue{sqp: &engine.StatQueueProfile{ID: "FOURTH", Weight: 35.0}},
- &StatQueue{sqp: &engine.StatQueueProfile{ID: "FIRST", Weight: 30.0}},
- &StatQueue{sqp: &engine.StatQueueProfile{ID: "THIRD", Weight: 30.0}},
+ &StatQueue{sqPrfl: &StatQueueProfile{ID: "SECOND", Weight: 40.0}},
+ &StatQueue{sqPrfl: &StatQueueProfile{ID: "FOURTH", Weight: 35.0}},
+ &StatQueue{sqPrfl: &StatQueueProfile{ID: "FIRST", Weight: 30.0}},
+ &StatQueue{sqPrfl: &StatQueueProfile{ID: "THIRD", Weight: 30.0}},
}
if !reflect.DeepEqual(eSInst, sInsts) {
t.Errorf("expecting: %+v, received: %+v", eSInst, sInsts)
diff --git a/engine/statmetrics.go b/engine/statmetrics.go
new file mode 100644
index 000000000..71cca8fdc
--- /dev/null
+++ b/engine/statmetrics.go
@@ -0,0 +1,149 @@
+/*
+Real-time Online/Offline Charging System (OCS) for Telecom & ISP environments
+Copyright (C) ITsysCOM GmbH
+
+This program is free software: you can redistribute it and/or modify
+it under the terms of the GNU General Public License as published by
+the Free Software Foundation, either version 3 of the License, or
+(at your option) any later version.
+
+This program is distributed in the hope that it will be useful,
+but WITHOUT ANY WARRANTY; without even the implied warranty of
+MERCHANTABILITY or FITNESS FOR A PARTICULAR PURPOSE. See the
+GNU General Public License for more details.
+
+You should have received a copy of the GNU General Public License
+along with this program. If not, see
+*/
+
+package engine
+
+import (
+ "fmt"
+ "time"
+
+ "github.com/cgrates/cgrates/config"
+ "github.com/cgrates/cgrates/utils"
+)
+
+// NewStatsMetrics instantiates the StatsMetrics
+// cfg serves as general purpose container to pass config options to metric
+func NewStatsMetric(metricID string) (sm StatsMetric, err error) {
+ metrics := map[string]func() (StatsMetric, error){
+ utils.MetaASR: NewASR,
+ utils.MetaACD: NewACD,
+ }
+ if _, has := metrics[metricID]; !has {
+ return nil, fmt.Errorf("unsupported metric: %s", metricID)
+ }
+ return metrics[metricID]()
+}
+
+// StatsMetric is the interface which a metric should implement
+type StatsMetric interface {
+ GetValue() interface{}
+ GetStringValue(fmtOpts string) (val string)
+ GetFloat64Value() (val float64)
+ AddEvent(ev StatsEvent) error
+ RemEvent(ev StatsEvent) error
+ GetMarshaled(ms Marshaler) (vals []byte, err error)
+ SetFromMarshaled(vals []byte, ms Marshaler) (err error) // mostly used to load from DB
+}
+
+func NewASR() (StatsMetric, error) {
+ return new(ASRStat), nil
+}
+
+// ASR implements AverageSuccessRatio metric
+type ASRStat struct {
+ Answered float64
+ Count float64
+}
+
+func (asr *ASRStat) GetValue() (v interface{}) {
+ if asr.Count == 0 {
+ return float64(STATS_NA)
+ }
+ return utils.Round((asr.Answered / asr.Count * 100),
+ config.CgrConfig().RoundingDecimals, utils.ROUNDING_MIDDLE)
+}
+
+func (asr *ASRStat) GetStringValue(fmtOpts string) (valStr string) {
+ if asr.Count == 0 {
+ return utils.NOT_AVAILABLE
+ }
+ val := asr.GetValue().(float64)
+ return fmt.Sprintf("%v%%", val) // %v will automatically limit the number of decimals printed
+}
+
+func (asr *ASRStat) GetFloat64Value() (val float64) {
+ return asr.GetValue().(float64)
+}
+
+func (asr *ASRStat) AddEvent(ev StatsEvent) (err error) {
+ if at, err := ev.AnswerTime(config.CgrConfig().DefaultTimezone); err != nil &&
+ err != utils.ErrNotFound {
+ return err
+ } else if !at.IsZero() {
+ asr.Answered += 1
+ }
+ asr.Count += 1
+ return
+}
+
+func (asr *ASRStat) RemEvent(ev StatsEvent) (err error) {
+ if at, err := ev.AnswerTime(config.CgrConfig().DefaultTimezone); err != nil &&
+ err != utils.ErrNotFound {
+ return err
+ } else if !at.IsZero() {
+ asr.Answered -= 1
+ }
+ asr.Count -= 1
+ return
+}
+
+func (asr *ASRStat) GetMarshaled(ms Marshaler) (vals []byte, err error) {
+ return ms.Marshal(asr)
+}
+
+func (asr *ASRStat) SetFromMarshaled(vals []byte, ms Marshaler) (err error) {
+ return ms.Unmarshal(vals, asr)
+}
+
+func NewACD() (StatsMetric, error) {
+ return new(ACDStat), nil
+}
+
+// ACD implements AverageCallDuration metric
+type ACDStat struct {
+ Sum time.Duration
+ Count int
+}
+
+func (acd *ACDStat) GetStringValue(fmtOpts string) (val string) {
+ return
+}
+
+func (acd *ACDStat) GetValue() (v interface{}) {
+ return
+}
+
+func (acd *ACDStat) GetFloat64Value() (v float64) {
+ return float64(STATS_NA)
+}
+
+func (acd *ACDStat) AddEvent(ev StatsEvent) (err error) {
+ return
+}
+
+func (acd *ACDStat) RemEvent(ev StatsEvent) (err error) {
+ return
+}
+
+func (acd *ACDStat) GetMarshaled(ms Marshaler) (vals []byte, err error) {
+ return
+}
+
+func (acd *ACDStat) SetFromMarshaled(vals []byte, ms Marshaler) (err error) {
+ return
+}
diff --git a/stats/asr_test.go b/engine/statmetrics_test.go
similarity index 97%
rename from stats/asr_test.go
rename to engine/statmetrics_test.go
index c0e1cb136..87beed89d 100644
--- a/stats/asr_test.go
+++ b/engine/statmetrics_test.go
@@ -15,13 +15,13 @@ GNU General Public License for more details.
You should have received a copy of the GNU General Public License
along with this program. If not, see
*/
-package stats
+package engine
+/*
import (
"testing"
"time"
- "github.com/cgrates/cgrates/engine"
"github.com/cgrates/cgrates/utils"
)
@@ -83,3 +83,4 @@ func TestASRGetValue(t *testing.T) {
t.Errorf("wrong asr value: %f", v)
}
}
+*/
diff --git a/stats/service.go b/engine/stats.go
old mode 100755
new mode 100644
similarity index 78%
rename from stats/service.go
rename to engine/stats.go
index 12660383e..18b775b26
--- a/stats/service.go
+++ b/engine/stats.go
@@ -15,61 +15,37 @@ GNU General Public License for more details.
You should have received a copy of the GNU General Public License
along with this program. If not, see
*/
-package stats
+
+package engine
import (
- "errors"
- "fmt"
"math/rand"
- "reflect"
- "strings"
- "sync"
"time"
-
- "github.com/cgrates/cgrates/engine"
- "github.com/cgrates/cgrates/utils"
- "github.com/cgrates/rpcclient"
)
func init() {
rand.Seed(time.Now().UnixNano())
}
+/*
// NewStatService initializes a StatService
-func NewStatService(dataDB engine.DataDB, ms engine.Marshaler, storeInterval time.Duration) (ss *StatService, err error) {
+func NewStatService(dataDB DataDB, ms Marshaler, storeInterval time.Duration) (ss *StatService, err error) {
ss = &StatService{dataDB: dataDB, ms: ms, storeInterval: storeInterval,
- stopStoring: make(chan struct{}), evCache: NewStatsEventCache()}
- sqPrfxs, err := dataDB.GetKeysForPrefix(utils.StatQueueProfilePrefix)
+ stopStoring: make(chan struct{})}
+ sqPrfxs, err := dataDB.GetKeysForPrefix(utils.StatsConfigPrefix)
if err != nil {
return nil, err
}
- ss.queuesCache = make(map[string]*StatQueue)
- ss.queues = make(StatQueues, 0)
- for _, prfx := range sqPrfxs {
- if q, err := ss.loadQueue(prfx[len(utils.StatQueueProfilePrefix):]); err != nil {
- utils.Logger.Err(fmt.Sprintf(" failed loading quueue with id: <%s>, err: <%s>",
- q.sqp.ID, err.Error()))
- continue
- } else {
- ss.setQueue(q)
- }
- }
- ss.queues.Sort()
go ss.dumpStoredMetrics() // start dumpStoredMetrics loop
return
}
// StatService builds stats for events
type StatService struct {
- sync.RWMutex
- dataDB engine.DataDB
- ms engine.Marshaler
+ dataDB DataDB
+ ms Marshaler
storeInterval time.Duration
stopStoring chan struct{}
- evCache *StatsEventCache // so we can pass it to queues
- queuesCache map[string]*StatQueue // unordered db of StatQueues, used for fast queries
- queues StatQueues // ordered list of StatQueues
-
}
// ListenAndServe loops keeps the service alive
@@ -92,21 +68,15 @@ func (ss *StatService) Shutdown() error {
// setQueue adds or modifies a queue into cache
// sort will reorder the ss.queues
func (ss *StatService) loadQueue(qID string) (q *StatQueue, err error) {
- sq, err := ss.dataDB.GetStatQueueProfile(qID)
+ sq, err := ss.dataDB.GetStatsConfig(qID)
if err != nil {
return nil, err
}
- var sqSM *engine.SQStoredMetrics
- if sq.Store {
- if sqSM, err = ss.dataDB.GetSQStoredMetrics(sq.ID); err != nil && err != utils.ErrNotFound {
- return nil, err
- }
- }
return NewStatQueue(ss.evCache, ss.ms, sq, sqSM)
}
func (ss *StatService) setQueue(q *StatQueue) {
- ss.queuesCache[q.sqp.ID] = q
+ ss.queuesCache[q.cfg.ID] = q
ss.queues = append(ss.queues, q)
}
@@ -121,14 +91,14 @@ func (ss *StatService) remQueue(qID string) (si *StatQueue) {
// store stores the necessary storedMetrics to dataDB
func (ss *StatService) storeMetrics() {
for _, si := range ss.queues {
- if !si.sqp.Store || !si.dirty { // no need to save
+ if !si.cfg.Store || !si.dirty { // no need to save
continue
}
if siSM := si.GetStoredMetrics(); siSM != nil {
if err := ss.dataDB.SetSQStoredMetrics(siSM); err != nil {
utils.Logger.Warning(
fmt.Sprintf(" failed saving StoredMetrics for QueueID: %s, error: %s",
- si.sqp.ID, err.Error()))
+ si.cfg.ID, err.Error()))
}
}
// randomize the CPU load and give up thread control
@@ -150,7 +120,7 @@ func (ss *StatService) dumpStoredMetrics() {
}
// processEvent processes a StatsEvent through the queues and caches it when needed
-func (ss *StatService) processEvent(ev engine.StatsEvent) (err error) {
+func (ss *StatService) processEvent(ev StatsEvent) (err error) {
evStatsID := ev.ID()
if evStatsID == "" { // ID is mandatory
return errors.New("missing ID field")
@@ -159,9 +129,9 @@ func (ss *StatService) processEvent(ev engine.StatsEvent) (err error) {
if err := stInst.ProcessEvent(ev); err != nil {
utils.Logger.Warning(
fmt.Sprintf(" QueueID: %s, ignoring event with ID: %s, error: %s",
- stInst.sqp.ID, evStatsID, err.Error()))
+ stInst.cfg.ID, evStatsID, err.Error()))
}
- if stInst.sqp.Blocker {
+ if stInst.cfg.Blocker {
break
}
}
@@ -169,7 +139,7 @@ func (ss *StatService) processEvent(ev engine.StatsEvent) (err error) {
}
// V1ProcessEvent implements StatV1 method for processing an Event
-func (ss *StatService) V1ProcessEvent(ev engine.StatsEvent, reply *string) (err error) {
+func (ss *StatService) V1ProcessEvent(ev StatsEvent, reply *string) (err error) {
if err = ss.processEvent(ev); err == nil {
*reply = utils.OK
}
@@ -225,13 +195,13 @@ type ArgsLoadQueues struct {
func (ss *StatService) V1LoadQueues(args ArgsLoadQueues, reply *string) (err error) {
qIDs := args.QueueIDs
if qIDs == nil {
- sqPrfxs, err := ss.dataDB.GetKeysForPrefix(utils.StatQueueProfilePrefix)
+ sqPrfxs, err := ss.dataDB.GetKeysForPrefix(utils.StatsConfigPrefix)
if err != nil {
return err
}
queueIDs := make([]string, len(sqPrfxs))
for i, prfx := range sqPrfxs {
- queueIDs[i] = prfx[len(utils.StatQueueProfilePrefix):]
+ queueIDs[i] = prfx[len(utils.StatsConfigPrefix):]
}
if len(queueIDs) != 0 {
qIDs = &queueIDs
@@ -247,7 +217,7 @@ func (ss *StatService) V1LoadQueues(args ArgsLoadQueues, reply *string) (err err
}
if q, err := ss.loadQueue(qID); err != nil {
utils.Logger.Err(fmt.Sprintf(" failed loading quueue with id: <%s>, err: <%s>",
- q.sqp.ID, err.Error()))
+ q.cfg.ID, err.Error()))
continue
} else {
sQs = append(sQs, q)
@@ -288,3 +258,5 @@ func (ss *StatService) Call(serviceMethod string, args interface{}, reply interf
}
return err
}
+
+*/
diff --git a/stats/service_test.go b/engine/stats_test.go
similarity index 94%
rename from stats/service_test.go
rename to engine/stats_test.go
index 2cbf9b90d..3cc424c9e 100644
--- a/stats/service_test.go
+++ b/engine/stats_test.go
@@ -15,14 +15,14 @@ GNU General Public License for more details.
You should have received a copy of the GNU General Public License
along with this program. If not, see
*/
-package stats
+package engine
+/*
import (
"testing"
"time"
"github.com/cgrates/cgrates/config"
- "github.com/cgrates/cgrates/engine"
"github.com/cgrates/cgrates/utils"
)
@@ -32,8 +32,8 @@ func TestReqFilterPassStatS(t *testing.T) {
config.SetCgrConfig(cgrCfg)
}
dataStorage, _ := engine.NewMapStorage()
- dataStorage.SetStatQueueProfile(
- &engine.StatQueueProfile{ID: "CDRST1",
+ dataStorage.SetStatsConfig(
+ &engine.StatsConfig{ID: "CDRST1",
Filters: []*engine.RequestFilter{
&engine.RequestFilter{Type: engine.MetaString, FieldName: "Tenant",
Values: []string{"cgrates.org"}}},
@@ -75,3 +75,4 @@ func TestReqFilterPassStatS(t *testing.T) {
t.Error("Not passing")
}
}
+*/
diff --git a/engine/storage_interface.go b/engine/storage_interface.go
index 115d153ba..d7e8230e1 100755
--- a/engine/storage_interface.go
+++ b/engine/storage_interface.go
@@ -115,9 +115,6 @@ type DataDB interface {
GetStatQueueProfile(sqID string) (sq *StatQueueProfile, err error)
SetStatQueueProfile(sq *StatQueueProfile) (err error)
RemStatQueueProfile(sqID string) (err error)
- GetSQStoredMetrics(sqID string) (sqSM *SQStoredMetrics, err error)
- SetSQStoredMetrics(sqSM *SQStoredMetrics) (err error)
- RemSQStoredMetrics(sqID string) (err error)
GetThresholdCfg(ID string, skipCache bool, transactionID string) (th *ThresholdCfg, err error)
SetThresholdCfg(th *ThresholdCfg) (err error)
RemThresholdCfg(ID string, transactionID string) (err error)
diff --git a/engine/storage_map.go b/engine/storage_map.go
index 6fc777de5..37e672144 100755
--- a/engine/storage_map.go
+++ b/engine/storage_map.go
@@ -1513,36 +1513,36 @@ func (ms *MapStorage) RemStatQueueProfile(scfID string) (err error) {
return
}
-// GetSQStoredMetrics retrieves the stored metrics for a StatsQueue
-func (ms *MapStorage) GetSQStoredMetrics(sqID string) (sqSM *SQStoredMetrics, err error) {
+// GetStatQueue retrieves the stored metrics for a StatsQueue
+func (ms *MapStorage) GetStatQueue(sqID string) (sq *StatQueue, err error) {
ms.mu.RLock()
defer ms.mu.RUnlock()
- values, ok := ms.dict[utils.SQStoredMetricsPrefix+sqID]
+ values, ok := ms.dict[utils.StatQueuePrefix+sqID]
if !ok {
return nil, utils.ErrNotFound
}
- err = ms.ms.Unmarshal(values, &sqSM)
+ err = ms.ms.Unmarshal(values, &sq)
return
}
-// SetStoredSQ stores the metrics for a StatsQueue
-func (ms *MapStorage) SetSQStoredMetrics(sqSM *SQStoredMetrics) (err error) {
+// SetStatQueue stores the metrics for a StatsQueue
+func (ms *MapStorage) SetStatQueue(sq *StatQueue) (err error) {
ms.mu.Lock()
defer ms.mu.Unlock()
var result []byte
- result, err = ms.ms.Marshal(sqSM)
+ result, err = ms.ms.Marshal(sq)
if err != nil {
return err
}
- ms.dict[utils.SQStoredMetricsPrefix+sqSM.SqID] = result
+ ms.dict[utils.StatQueuePrefix+sq.ID] = result
return
}
-// RemSQStoredMetrics removes stored metrics for a StatsQueue
-func (ms *MapStorage) RemSQStoredMetrics(sqID string) (err error) {
+// RemStatQueue removes a StatsQueue
+func (ms *MapStorage) RemStatQueue(sqID string) (err error) {
ms.mu.Lock()
defer ms.mu.Unlock()
- delete(ms.dict, utils.SQStoredMetricsPrefix+sqID)
+ delete(ms.dict, utils.StatQueuePrefix+sqID)
return
}
diff --git a/engine/storage_mongo_datadb.go b/engine/storage_mongo_datadb.go
index 898e151d1..e530d7194 100755
--- a/engine/storage_mongo_datadb.go
+++ b/engine/storage_mongo_datadb.go
@@ -61,6 +61,7 @@ const (
colRFI = "request_filter_indexes"
colTmg = "timings"
colRes = "resources"
+ colStQs = "statqueues"
)
var (
@@ -2045,11 +2046,11 @@ func (ms *MongoStorage) RemStatQueueProfile(sqID string) (err error) {
return
}
-// GetSQStoredMetrics retrieves the stored metrics for a StatsQueue
-func (ms *MongoStorage) GetSQStoredMetrics(sqmID string) (sqSM *SQStoredMetrics, err error) {
- session, col := ms.conn(utils.SQStoredMetricsPrefix)
+// GetStatQueue retrieves a StatsQueue
+func (ms *MongoStorage) GetStatQueue(sqID string) (sq *StatQueue, err error) {
+ session, col := ms.conn(colStQs)
defer session.Close()
- if err = col.Find(bson.M{"sqid": sqmID}).One(&sqSM); err != nil {
+ if err = col.Find(bson.M{"id": sqID}).One(&sq); err != nil {
if err == mgo.ErrNotFound {
err = utils.ErrNotFound
}
@@ -2059,16 +2060,16 @@ func (ms *MongoStorage) GetSQStoredMetrics(sqmID string) (sqSM *SQStoredMetrics,
}
// SetStoredSQ stores the metrics for a StatsQueue
-func (ms *MongoStorage) SetSQStoredMetrics(sqSM *SQStoredMetrics) (err error) {
- session, col := ms.conn(utils.SQStoredMetricsPrefix)
+func (ms *MongoStorage) SetStatQueue(sq *StatQueue) (err error) {
+ session, col := ms.conn(colStQs)
defer session.Close()
- _, err = col.UpsertId(bson.M{"sqid": sqSM.SqID}, sqSM)
+ _, err = col.Upsert(bson.M{"id": sq.ID}, sq)
return
}
-// RemSQStoredMetrics removes stored metrics for a StatsQueue
-func (ms *MongoStorage) RemSQStoredMetrics(sqmID string) (err error) {
- session, col := ms.conn(utils.SQStoredMetricsPrefix)
+// RemStatQueue removes stored metrics for a StatsQueue
+func (ms *MongoStorage) RemStatQueue(sqmID string) (err error) {
+ session, col := ms.conn(colStQs)
defer session.Close()
err = col.Remove(bson.M{"sqid": sqmID})
return err
diff --git a/engine/storage_redis.go b/engine/storage_redis.go
index 6561f453a..5febb1000 100755
--- a/engine/storage_redis.go
+++ b/engine/storage_redis.go
@@ -1620,9 +1620,9 @@ func (rs *RedisStorage) RemStatQueueProfile(sqID string) (err error) {
return
}
-// GetSQStoredMetrics retrieves the stored metrics for a StatsQueue
-func (rs *RedisStorage) GetSQStoredMetrics(sqmID string) (sqSM *SQStoredMetrics, err error) {
- key := utils.SQStoredMetricsPrefix + sqmID
+// GetStatQueue retrieves the stored metrics for a StatsQueue
+func (rs *RedisStorage) GetStatQueue(sqID string) (sq *StatQueue, err error) {
+ key := utils.StatQueuePrefix + sqID
var values []byte
if values, err = rs.Cmd("GET", key).Bytes(); err != nil {
if err == redis.ErrRespNil {
@@ -1630,25 +1630,25 @@ func (rs *RedisStorage) GetSQStoredMetrics(sqmID string) (sqSM *SQStoredMetrics,
}
return
}
- if err = rs.ms.Unmarshal(values, &sqSM); err != nil {
+ if err = rs.ms.Unmarshal(values, &sq); err != nil {
return
}
return
}
-// SetStoredSQ stores the metrics for a StatsQueue
-func (rs *RedisStorage) SetSQStoredMetrics(sqSM *SQStoredMetrics) (err error) {
+// SetStatQueue stores the metrics for a StatsQueue
+func (rs *RedisStorage) SetStatQueue(sq *StatQueue) (err error) {
var result []byte
- result, err = rs.ms.Marshal(sqSM)
+ result, err = rs.ms.Marshal(sq)
if err != nil {
return
}
- return rs.Cmd("SET", utils.SQStoredMetricsPrefix+sqSM.SqID, result).Err
+ return rs.Cmd("SET", utils.StatQueuePrefix+sq.ID, result).Err
}
-// RemSQStoredMetrics removes stored metrics for a StatsQueue
-func (rs *RedisStorage) RemSQStoredMetrics(sqmID string) (err error) {
- key := utils.SQStoredMetricsPrefix + sqmID
+// RemStatQueue removes a StatsQueue
+func (rs *RedisStorage) RemStatQueue(sqID string) (err error) {
+ key := utils.StatQueuePrefix + sqID
if err = rs.Cmd("DEL", key).Err; err != nil {
return
}
diff --git a/migrator/v1DataDB.go b/migrator/v1datadb.go
similarity index 100%
rename from migrator/v1DataDB.go
rename to migrator/v1datadb.go
diff --git a/migrator/v1Migrator_Utils.go b/migrator/v1migrator_utils.go
similarity index 100%
rename from migrator/v1Migrator_Utils.go
rename to migrator/v1migrator_utils.go
diff --git a/migrator/v1MongoData.go b/migrator/v1mongo_data.go
similarity index 100%
rename from migrator/v1MongoData.go
rename to migrator/v1mongo_data.go
diff --git a/migrator/v1Redis.go b/migrator/v1redis.go
similarity index 100%
rename from migrator/v1Redis.go
rename to migrator/v1redis.go
diff --git a/stats/acd.go b/stats/acd.go
deleted file mode 100644
index f2a6cdfb9..000000000
--- a/stats/acd.go
+++ /dev/null
@@ -1,63 +0,0 @@
-/*
-Real-time Online/Offline Charging System (OCS) for Telecom & ISP environments
-Copyright (C) ITsysCOM GmbH
-
-This program is free software: you can redistribute it and/or modify
-it under the terms of the GNU General Public License as published by
-the Free Software Foundation, either version 3 of the License, or
-(at your option) any later version.
-
-This program is distributed in the hope that it will be useful,
-but WITHOUT ANY WARRANTY; without even the implied warranty of
-MERCHANTABILITY or FITNESS FOR A PARTICULAR PURPOSE. See the
-GNU General Public License for more details.
-
-You should have received a copy of the GNU General Public License
-along with this program. If not, see
-*/
-
-package stats
-
-import (
- "time"
-
- "github.com/cgrates/cgrates/engine"
-)
-
-func NewACD() (StatsMetric, error) {
- return new(ACD), nil
-}
-
-// ACD implements AverageCallDuration metric
-type ACD struct {
- Sum time.Duration
- Count int
-}
-
-func (acd *ACD) GetStringValue(fmtOpts string) (val string) {
- return
-}
-
-func (acd *ACD) GetValue() (v interface{}) {
- return
-}
-
-func (acd *ACD) GetFloat64Value() (v float64) {
- return float64(engine.STATS_NA)
-}
-
-func (acd *ACD) AddEvent(ev engine.StatsEvent) (err error) {
- return
-}
-
-func (acd *ACD) RemEvent(ev engine.StatsEvent) (err error) {
- return
-}
-
-func (acd *ACD) GetMarshaled(ms engine.Marshaler) (vals []byte, err error) {
- return
-}
-
-func (acd *ACD) SetFromMarshaled(vals []byte, ms engine.Marshaler) (err error) {
- return
-}
diff --git a/stats/asr.go b/stats/asr.go
deleted file mode 100644
index 4b24c450f..000000000
--- a/stats/asr.go
+++ /dev/null
@@ -1,87 +0,0 @@
-/*
-Real-time Online/Offline Charging System (OCS) for Telecom & ISP environments
-Copyright (C) ITsysCOM GmbH
-
-This program is free software: you can redistribute it and/or modify
-it under the terms of the GNU General Public License as published by
-the Free Software Foundation, either version 3 of the License, or
-(at your option) any later version.
-
-This program is distributed in the hope that it will be useful,
-but WITHOUT ANY WARRANTY; without even the implied warranty of
-MERCHANTABILITY or FITNESS FOR A PARTICULAR PURPOSE. See the
-GNU General Public License for more details.
-
-You should have received a copy of the GNU General Public License
-along with this program. If not, see
-*/
-
-package stats
-
-import (
- "fmt"
-
- "github.com/cgrates/cgrates/config"
- "github.com/cgrates/cgrates/engine"
- "github.com/cgrates/cgrates/utils"
-)
-
-func NewASR() (StatsMetric, error) {
- return new(ASR), nil
-}
-
-// ASR implements AverageSuccessRatio metric
-type ASR struct {
- Answered float64
- Count float64
-}
-
-func (asr *ASR) GetValue() (v interface{}) {
- if asr.Count == 0 {
- return float64(engine.STATS_NA)
- }
- return utils.Round((asr.Answered / asr.Count * 100),
- config.CgrConfig().RoundingDecimals, utils.ROUNDING_MIDDLE)
-}
-
-func (asr *ASR) GetStringValue(fmtOpts string) (valStr string) {
- if asr.Count == 0 {
- return utils.NOT_AVAILABLE
- }
- val := asr.GetValue().(float64)
- return fmt.Sprintf("%v%%", val) // %v will automatically limit the number of decimals printed
-}
-
-func (asr *ASR) GetFloat64Value() (val float64) {
- return asr.GetValue().(float64)
-}
-
-func (asr *ASR) AddEvent(ev engine.StatsEvent) (err error) {
- if at, err := ev.AnswerTime(config.CgrConfig().DefaultTimezone); err != nil &&
- err != utils.ErrNotFound {
- return err
- } else if !at.IsZero() {
- asr.Answered += 1
- }
- asr.Count += 1
- return
-}
-
-func (asr *ASR) RemEvent(ev engine.StatsEvent) (err error) {
- if at, err := ev.AnswerTime(config.CgrConfig().DefaultTimezone); err != nil &&
- err != utils.ErrNotFound {
- return err
- } else if !at.IsZero() {
- asr.Answered -= 1
- }
- asr.Count -= 1
- return
-}
-
-func (asr *ASR) GetMarshaled(ms engine.Marshaler) (vals []byte, err error) {
- return ms.Marshal(asr)
-}
-
-func (asr *ASR) SetFromMarshaled(vals []byte, ms engine.Marshaler) (err error) {
- return ms.Unmarshal(vals, asr)
-}
diff --git a/stats/eventcache.go b/stats/eventcache.go
deleted file mode 100644
index 6c8d17302..000000000
--- a/stats/eventcache.go
+++ /dev/null
@@ -1,72 +0,0 @@
-/*
-Real-time Online/Offline Charging System (OCS) for Telecom & ISP environments
-Copyright (C) ITsysCOM GmbH
-
-This program is free software: you can redistribute it and/or modify
-it under the terms of the GNU General Public License as published by
-the Free Software Foundation, either version 3 of the License, or
-(at your option) any later version.
-
-This program is distributed in the hope that it will be useful,
-but WITHOUT ANY WARRANTY; without even the implied warranty of
-MERCHANTABILITY or FITNESS FOR A PARTICULAR PURPOSE. See the
-GNU General Public License for more details.
-
-You should have received a copy of the GNU General Public License
-along with this program. If not, see
-*/
-package stats
-
-import (
- "sync"
-
- "github.com/cgrates/cgrates/engine"
- "github.com/cgrates/cgrates/utils"
-)
-
-// NewStatsEventCache instantiates a StatsEventCache
-func NewStatsEventCache() *StatsEventCache {
- return &StatsEventCache{
- evCacheIdx: make(map[string]utils.StringMap),
- evCache: make(map[string]engine.StatsEvent)}
-}
-
-// StatsEventCache keeps a cache of StatsEvents which are referenced by StatsQueues
-type StatsEventCache struct {
- sync.RWMutex
- evCacheIdx map[string]utils.StringMap // index events used in queues, map[eventID]map[queueID]bool
- evCache map[string]engine.StatsEvent // cache for the processed events
-}
-
-// Cache will cache an event and reference it in the index
-func (sec *StatsEventCache) Cache(evID string, ev engine.StatsEvent, queueID string) {
- if utils.IsSliceMember([]string{evID, queueID}, "") {
- return
- }
- sec.Lock()
- if _, hasIt := sec.evCache[evID]; !hasIt {
- sec.evCache[evID] = ev
- }
- sec.evCacheIdx[evID][queueID] = true
- sec.Unlock()
-}
-
-func (sec *StatsEventCache) UnCache(evID string, ev engine.StatsEvent, queueID string) {
- sec.Lock()
- if _, hasIt := sec.evCache[evID]; !hasIt {
- return
- }
- delete(sec.evCacheIdx[evID], queueID)
- if len(sec.evCacheIdx[evID]) == 0 {
- delete(sec.evCacheIdx, evID)
- delete(sec.evCache, evID)
- }
- sec.Unlock()
-}
-
-// GetEvent returns the event based on ID
-func (sec *StatsEventCache) GetEvent(evID string) engine.StatsEvent {
- sec.RLock()
- defer sec.RUnlock()
- return sec.evCache[evID]
-}
diff --git a/stats/metric.go b/stats/metric.go
deleted file mode 100644
index 51ee293dd..000000000
--- a/stats/metric.go
+++ /dev/null
@@ -1,50 +0,0 @@
-/*
-Real-time Online/Offline Charging System (OCS) for Telecom & ISP environments
-Copyright (C) ITsysCOM GmbH
-
-This program is free software: you can redistribute it and/or modify
-it under the terms of the GNU General Public License as published by
-the Free Software Foundation, either version 3 of the License, or
-(at your option) any later version.
-
-This program is distributed in the hope that it will be useful,
-but WITHOUT ANY WARRANTY; without even the implied warranty of
-MERCHANTABILITY or FITNESS FOR A PARTICULAR PURPOSE. See the
-GNU General Public License for more details.
-
-You should have received a copy of the GNU General Public License
-along with this program. If not, see
-*/
-
-package stats
-
-import (
- "fmt"
-
- "github.com/cgrates/cgrates/engine"
- "github.com/cgrates/cgrates/utils"
-)
-
-// NewStatsMetrics instantiates the StatsMetrics
-// cfg serves as general purpose container to pass config options to metric
-func NewStatsMetric(metricID string) (sm StatsMetric, err error) {
- metrics := map[string]func() (StatsMetric, error){
- utils.MetaASR: NewASR,
- utils.MetaACD: NewACD,
- }
- if _, has := metrics[metricID]; !has {
- return nil, fmt.Errorf("unsupported metric: %s", metricID)
- }
- return metrics[metricID]()
-}
-
-// StatsMetric is the interface which a metric should implement
-type StatsMetric interface {
- GetValue() interface{}
- GetStringValue(fmtOpts string) (val string)
- GetFloat64Value() (val float64)
- AddEvent(ev engine.StatsEvent) error
- RemEvent(ev engine.StatsEvent) error
- GetMarshaled(ms engine.Marshaler) (vals []byte, err error)
- SetFromMarshaled(vals []byte, ms engine.Marshaler) (err error) // mostly used to load from DB
-}
diff --git a/stats/queue.go b/stats/queue.go
deleted file mode 100644
index afadef9b9..000000000
--- a/stats/queue.go
+++ /dev/null
@@ -1,189 +0,0 @@
-/*
-Real-time Online/Offline Charging System (OCS) for Telecom & ISP environments
-Copyright (C) ITsysCOM GmbH
-
-This program is free software: you can redistribute it and/or modify
-it under the terms of the GNU General Public License as published by
-the Free Software Foundation, either version 3 of the License, or
-(at your option) any later version.
-
-This program is distributed in the hope that it will be useful,
-but WITHOUT ANY WARRANTY; without even the implied warranty of
-MERCHANTABILITY or FITNESS FOR A PARTICULAR PURPOSE. See the
-GNU General Public License for more details.
-
-You should have received a copy of the GNU General Public License
-along with this program. If not, see
-*/
-package stats
-
-import (
- "fmt"
- "sort"
- "sync"
- "time"
-
- "github.com/cgrates/cgrates/engine"
- "github.com/cgrates/cgrates/utils"
-)
-
-// StatQueues is a sortable list of StatQueue
-type StatQueues []*StatQueue
-
-// Sort is part of sort interface, sort based on Weight
-func (sis StatQueues) Sort() {
- sort.Slice(sis, func(i, j int) bool { return sis[i].sqp.Weight > sis[j].sqp.Weight })
-}
-
-// remWithID removes the queue with ID from slice
-func (sis StatQueues) remWithID(qID string) {
- for i, q := range sis {
- if q.sqp.ID == qID {
- copy(sis[i:], sis[i+1:])
- sis[len(sis)-1] = nil
- sis = sis[:len(sis)-1]
- break // there can be only one item with ID
- }
- }
-}
-
-// NewStatQueue instantiates a StatQueue
-func NewStatQueue(sec *StatsEventCache, ms engine.Marshaler,
- sqCfg *engine.StatQueueProfile, sqSM *engine.SQStoredMetrics) (si *StatQueue, err error) {
- si = &StatQueue{sec: sec, ms: ms, sqp: sqCfg, sqMetrics: make(map[string]StatsMetric)}
- for _, metricID := range sqCfg.Metrics {
- if si.sqMetrics[metricID], err = NewStatsMetric(metricID); err != nil {
- return
- }
- }
- if sqSM != nil {
- for evID, ev := range sqSM.SEvents {
- si.sec.Cache(evID, ev, si.sqp.ID)
- }
- si.sqItems = sqSM.SQItems
- for metricID := range si.sqMetrics {
- if _, has := si.sqMetrics[metricID]; !has {
- if si.sqMetrics[metricID], err = NewStatsMetric(metricID); err != nil {
- return
- }
- }
- if stored, has := sqSM.SQMetrics[metricID]; !has {
- continue
- } else if err = si.sqMetrics[metricID].SetFromMarshaled(stored, ms); err != nil {
- return
- }
- }
- }
- return
-}
-
-// StatQueue represents an individual stats instance
-type StatQueue struct {
- sync.RWMutex
- dirty bool // needs save
- sec *StatsEventCache
- sqItems []*engine.SQItem
- sqMetrics map[string]StatsMetric
- ms engine.Marshaler // used to get/set Metrics
- sqp *engine.StatQueueProfile
-}
-
-// GetSQStoredMetrics retrieves the data used for store to DB
-func (sq *StatQueue) GetStoredMetrics() (sqSM *engine.SQStoredMetrics) {
- sq.RLock()
- defer sq.RUnlock()
- sEvents := make(map[string]engine.StatsEvent)
- var sItems []*engine.SQItem
- for _, sqItem := range sq.sqItems { // make sure event is properly retrieved from cache
- ev := sq.sec.GetEvent(sqItem.EventID)
- if ev == nil {
- utils.Logger.Warning(fmt.Sprintf(" querying for storage eventID: %s, error: event not cached",
- sqItem.EventID))
- continue
- }
- sEvents[sqItem.EventID] = ev
- sItems = append(sItems, sqItem)
- }
- sqSM = &engine.SQStoredMetrics{
- SEvents: sEvents,
- SQItems: sItems,
- SQMetrics: make(map[string][]byte, len(sq.sqMetrics))}
- for metricID, metric := range sq.sqMetrics {
- var err error
- if sqSM.SQMetrics[metricID], err = metric.GetMarshaled(sq.ms); err != nil {
- utils.Logger.Warning(fmt.Sprintf(" querying for storage metricID: %s, error: %s",
- metricID, err.Error()))
- continue
- }
- }
- return
-}
-
-// ProcessEvent processes a StatsEvent, returns true if processed
-func (sq *StatQueue) ProcessEvent(ev engine.StatsEvent) (err error) {
- sq.Lock()
- sq.remExpired()
- sq.remOnQueueLength()
- sq.addStatsEvent(ev)
- sq.Unlock()
- return
-}
-
-// remExpired expires items in queue
-func (sq *StatQueue) remExpired() {
- var expIdx *int // index of last item to be expired
- for i, item := range sq.sqItems {
- if item.ExpiryTime == nil {
- break
- }
- if item.ExpiryTime.After(time.Now()) {
- break
- }
- sq.remEventWithID(item.EventID)
- item = nil // garbage collected asap
- expIdx = &i
- }
- if expIdx == nil {
- return
- }
- nextValidIdx := *expIdx + 1
- sq.sqItems = sq.sqItems[nextValidIdx:]
-}
-
-// remOnQueueLength rems elements based on QueueLength setting
-func (sq *StatQueue) remOnQueueLength() {
- if sq.sqp.QueueLength == 0 {
- return
- }
- if len(sq.sqItems) == sq.sqp.QueueLength { // reached limit, rem first element
- itm := sq.sqItems[0]
- sq.remEventWithID(itm.EventID)
- itm = nil
- sq.sqItems = sq.sqItems[1:]
- }
-}
-
-// addStatsEvent computes metrics for an event
-func (sq *StatQueue) addStatsEvent(ev engine.StatsEvent) {
- evID := ev.ID()
- for metricID, metric := range sq.sqMetrics {
- if err := metric.AddEvent(ev); err != nil {
- utils.Logger.Warning(fmt.Sprintf(" metricID: %s, add eventID: %s, error: %s",
- metricID, evID, err.Error()))
- }
- }
-}
-
-// remStatsEvent removes an event from metrics
-func (sq *StatQueue) remEventWithID(evID string) {
- ev := sq.sec.GetEvent(evID)
- if ev == nil {
- utils.Logger.Warning(fmt.Sprintf(" removing eventID: %s, error: event not cached", evID))
- return
- }
- for metricID, metric := range sq.sqMetrics {
- if err := metric.RemEvent(ev); err != nil {
- utils.Logger.Warning(fmt.Sprintf(" metricID: %s, remove eventID: %s, error: %s", metricID, evID, err.Error()))
- }
- }
-}
diff --git a/utils/consts.go b/utils/consts.go
index aba116bb0..a6e0ba31f 100755
--- a/utils/consts.go
+++ b/utils/consts.go
@@ -256,9 +256,9 @@ const (
LOG_ERR = "ler_"
LOG_CDR = "cdr_"
LOG_MEDIATED_CDR = "mcd_"
- SQStoredMetricsPrefix = "ssm_"
StatQueueProfilePrefix = "sqp_"
ThresholdCfgPrefix = "thc_"
+ StatQueuePrefix = "stq_"
LOADINST_KEY = "load_history"
SESSION_MANAGER_SOURCE = "SMR"
MEDIATOR_SOURCE = "MED"