mirror of
https://github.com/cgrates/cgrates.git
synced 2026-02-11 18:16:24 +05:00
Refactoring stats, disabled for now in sources
This commit is contained in:
@@ -17,6 +17,7 @@ along with this program. If not, see <http://www.gnu.org/licenses/>
|
||||
*/
|
||||
package v1
|
||||
|
||||
/*
|
||||
import (
|
||||
"reflect"
|
||||
"strings"
|
||||
@@ -136,3 +137,4 @@ func (apierV1 *ApierV1) RemStatConfig(attrs AttrGetStatsCfg, reply *string) erro
|
||||
return nil
|
||||
|
||||
}
|
||||
*/
|
||||
|
||||
@@ -19,6 +19,7 @@ along with this program. If not, see <http://www.gnu.org/licenses/>
|
||||
*/
|
||||
package v1
|
||||
|
||||
/*
|
||||
import (
|
||||
"math/rand"
|
||||
"net/rpc"
|
||||
@@ -346,3 +347,4 @@ func BenchmarkStatSV1GetStringMetrics(b *testing.B) {
|
||||
}
|
||||
}
|
||||
}
|
||||
*/
|
||||
|
||||
@@ -39,7 +39,6 @@ import (
|
||||
"github.com/cgrates/cgrates/scheduler"
|
||||
"github.com/cgrates/cgrates/servmanager"
|
||||
"github.com/cgrates/cgrates/sessionmanager"
|
||||
"github.com/cgrates/cgrates/stats"
|
||||
"github.com/cgrates/cgrates/utils"
|
||||
"github.com/cgrates/rpcclient"
|
||||
)
|
||||
@@ -560,6 +559,7 @@ func startResourceService(internalRsChan, internalStatSConn chan rpcclient.RpcCl
|
||||
internalRsChan <- rsV1
|
||||
}
|
||||
|
||||
/*
|
||||
// startStatService fires up the StatS
|
||||
func startStatService(internalStatSChan chan rpcclient.RpcClientConnection, cfg *config.CGRConfig,
|
||||
dataDB engine.DataDB, ms engine.Marshaler, server *utils.Server, exitChan chan bool) {
|
||||
@@ -582,6 +582,7 @@ func startStatService(internalStatSChan chan rpcclient.RpcClientConnection, cfg
|
||||
server.RpcRegister(stsV1)
|
||||
internalStatSChan <- stsV1
|
||||
}
|
||||
*/
|
||||
|
||||
func startRpc(server *utils.Server, internalRaterChan,
|
||||
internalCdrSChan, internalCdrStatSChan, internalHistorySChan, internalPubSubSChan, internalUserSChan,
|
||||
@@ -696,17 +697,6 @@ func main() {
|
||||
// Init cache
|
||||
cache.NewCache(cfg.CacheConfig)
|
||||
|
||||
var ms engine.Marshaler
|
||||
if ms, err = engine.NewMarshaler(cfg.DBDataEncoding); err != nil {
|
||||
log.Fatalf("error initializing marshaler: ", err)
|
||||
return
|
||||
}
|
||||
if err != nil {
|
||||
utils.Logger.Crit(fmt.Sprintf("<StatS> Could not start, error: %s", err.Error()))
|
||||
exitChan <- true
|
||||
return
|
||||
}
|
||||
|
||||
var dataDB engine.DataDB
|
||||
var loadDb engine.LoadStorage
|
||||
var cdrDb engine.CdrStorage
|
||||
@@ -857,9 +847,9 @@ func main() {
|
||||
internalStatSChan, cfg, dataDB, server, exitChan)
|
||||
}
|
||||
|
||||
if cfg.StatSCfg().Enabled {
|
||||
go startStatService(internalStatSChan, cfg, dataDB, ms, server, exitChan)
|
||||
}
|
||||
//if cfg.StatSCfg().Enabled {
|
||||
// go startStatService(internalStatSChan, cfg, dataDB, ms, server, exitChan)
|
||||
//}
|
||||
|
||||
// Serve rpc connections
|
||||
go startRpc(server, internalRaterChan, internalCdrSChan, internalCdrStatSChan, internalHistorySChan,
|
||||
|
||||
@@ -19,6 +19,7 @@ package engine
|
||||
|
||||
import (
|
||||
"errors"
|
||||
"sort"
|
||||
"time"
|
||||
|
||||
"github.com/cgrates/cgrates/utils"
|
||||
@@ -30,14 +31,6 @@ type SQItem struct {
|
||||
ExpiryTime *time.Time // Used to auto-expire events
|
||||
}
|
||||
|
||||
// SQStoredMetrics contains metrics saved in DB
|
||||
type SQStoredMetrics struct {
|
||||
SqID string // StatsInstanceID
|
||||
SEvents map[string]StatsEvent // Events used by SQItems
|
||||
SQItems []*SQItem // SQItems
|
||||
SQMetrics map[string][]byte
|
||||
}
|
||||
|
||||
// StatsConfig represents the configuration of a StatsInstance in StatS
|
||||
type StatsConfig struct {
|
||||
ID string // QueueID
|
||||
@@ -78,3 +71,123 @@ func (se StatsEvent) AnswerTime(timezone string) (at time.Time, err error) {
|
||||
}
|
||||
return utils.ParseTimeDetectLayout(atStr, timezone)
|
||||
}
|
||||
|
||||
// StatQueue represents an individual stats instance
|
||||
type StatQueue struct {
|
||||
ID string
|
||||
SQItems []*SQItem // SQItems
|
||||
SQMetrics map[string]StatsMetric
|
||||
sqItems []*SQItem
|
||||
sqPrfl *StatsConfig
|
||||
dirty *bool // needs save
|
||||
}
|
||||
|
||||
/*
|
||||
// GetSQStoredMetrics retrieves the data used for store to DB
|
||||
func (sq *StatQueue) GetStoredMetrics() (sqSM *engine.SQStoredMetrics) {
|
||||
sq.RLock()
|
||||
defer sq.RUnlock()
|
||||
sEvents := make(map[string]engine.StatsEvent)
|
||||
var sItems []*engine.SQItem
|
||||
for _, sqItem := range sq.sqItems { // make sure event is properly retrieved from cache
|
||||
ev := sq.sec.GetEvent(sqItem.EventID)
|
||||
if ev == nil {
|
||||
utils.Logger.Warning(fmt.Sprintf("<StatQueue> querying for storage eventID: %s, error: event not cached",
|
||||
sqItem.EventID))
|
||||
continue
|
||||
}
|
||||
sEvents[sqItem.EventID] = ev
|
||||
sItems = append(sItems, sqItem)
|
||||
}
|
||||
sqSM = &engine.SQStoredMetrics{
|
||||
SEvents: sEvents,
|
||||
SQItems: sItems,
|
||||
SQMetrics: make(map[string][]byte, len(sq.sqMetrics))}
|
||||
for metricID, metric := range sq.sqMetrics {
|
||||
var err error
|
||||
if sqSM.SQMetrics[metricID], err = metric.GetMarshaled(sq.ms); err != nil {
|
||||
utils.Logger.Warning(fmt.Sprintf("<StatQueue> querying for storage metricID: %s, error: %s",
|
||||
metricID, err.Error()))
|
||||
continue
|
||||
}
|
||||
}
|
||||
return
|
||||
}
|
||||
|
||||
// ProcessEvent processes a StatsEvent, returns true if processed
|
||||
func (sq *StatQueue) ProcessEvent(ev engine.StatsEvent) (err error) {
|
||||
sq.Lock()
|
||||
sq.remExpired()
|
||||
sq.remOnQueueLength()
|
||||
sq.addStatsEvent(ev)
|
||||
sq.Unlock()
|
||||
return
|
||||
}
|
||||
|
||||
// remExpired expires items in queue
|
||||
func (sq *StatQueue) remExpired() {
|
||||
var expIdx *int // index of last item to be expired
|
||||
for i, item := range sq.sqItems {
|
||||
if item.ExpiryTime == nil {
|
||||
break
|
||||
}
|
||||
if item.ExpiryTime.After(time.Now()) {
|
||||
break
|
||||
}
|
||||
sq.remEventWithID(item.EventID)
|
||||
item = nil // garbage collected asap
|
||||
expIdx = &i
|
||||
}
|
||||
if expIdx == nil {
|
||||
return
|
||||
}
|
||||
nextValidIdx := *expIdx + 1
|
||||
sq.sqItems = sq.sqItems[nextValidIdx:]
|
||||
}
|
||||
|
||||
// remOnQueueLength rems elements based on QueueLength setting
|
||||
func (sq *StatQueue) remOnQueueLength() {
|
||||
if sq.cfg.QueueLength == 0 {
|
||||
return
|
||||
}
|
||||
if len(sq.sqItems) == sq.cfg.QueueLength { // reached limit, rem first element
|
||||
itm := sq.sqItems[0]
|
||||
sq.remEventWithID(itm.EventID)
|
||||
itm = nil
|
||||
sq.sqItems = sq.sqItems[1:]
|
||||
}
|
||||
}
|
||||
|
||||
// addStatsEvent computes metrics for an event
|
||||
func (sq *StatQueue) addStatsEvent(ev engine.StatsEvent) {
|
||||
evID := ev.ID()
|
||||
for metricID, metric := range sq.sqMetrics {
|
||||
if err := metric.AddEvent(ev); err != nil {
|
||||
utils.Logger.Warning(fmt.Sprintf("<StatQueue> metricID: %s, add eventID: %s, error: %s",
|
||||
metricID, evID, err.Error()))
|
||||
}
|
||||
}
|
||||
}
|
||||
|
||||
// remStatsEvent removes an event from metrics
|
||||
func (sq *StatQueue) remEventWithID(evID string) {
|
||||
ev := sq.sec.GetEvent(evID)
|
||||
if ev == nil {
|
||||
utils.Logger.Warning(fmt.Sprintf("<StatQueue> removing eventID: %s, error: event not cached", evID))
|
||||
return
|
||||
}
|
||||
for metricID, metric := range sq.sqMetrics {
|
||||
if err := metric.RemEvent(ev); err != nil {
|
||||
utils.Logger.Warning(fmt.Sprintf("<StatQueue> metricID: %s, remove eventID: %s, error: %s", metricID, evID, err.Error()))
|
||||
}
|
||||
}
|
||||
}
|
||||
*/
|
||||
|
||||
// StatQueues is a sortable list of StatQueue
|
||||
type StatQueues []*StatQueue
|
||||
|
||||
// Sort is part of sort interface, sort based on Weight
|
||||
func (sis StatQueues) Sort() {
|
||||
sort.Slice(sis, func(i, j int) bool { return sis[i].sqPrfl.Weight > sis[j].sqPrfl.Weight })
|
||||
}
|
||||
|
||||
@@ -117,9 +117,9 @@ type DataDB interface {
|
||||
GetStatsConfig(sqID string) (sq *StatsConfig, err error)
|
||||
SetStatsConfig(sq *StatsConfig) (err error)
|
||||
RemStatsConfig(sqID string) (err error)
|
||||
GetSQStoredMetrics(sqID string) (sqSM *SQStoredMetrics, err error)
|
||||
SetSQStoredMetrics(sqSM *SQStoredMetrics) (err error)
|
||||
RemSQStoredMetrics(sqID string) (err error)
|
||||
GetStatQueue(sqID string) (sqSM *StatQueue, err error)
|
||||
SetStatQueue(sq *StatQueue) (err error)
|
||||
RemStatQueue(sqID string) (err error)
|
||||
GetThresholdCfg(ID string, skipCache bool, transactionID string) (th *ThresholdCfg, err error)
|
||||
SetThresholdCfg(th *ThresholdCfg) (err error)
|
||||
RemThresholdCfg(ID string, transactionID string) (err error)
|
||||
|
||||
@@ -1564,36 +1564,36 @@ func (ms *MapStorage) RemStatsConfig(scfID string) (err error) {
|
||||
return
|
||||
}
|
||||
|
||||
// GetSQStoredMetrics retrieves the stored metrics for a StatsQueue
|
||||
func (ms *MapStorage) GetSQStoredMetrics(sqID string) (sqSM *SQStoredMetrics, err error) {
|
||||
// GetStatQueue retrieves the stored metrics for a StatsQueue
|
||||
func (ms *MapStorage) GetStatQueue(sqID string) (sq *StatQueue, err error) {
|
||||
ms.mu.RLock()
|
||||
defer ms.mu.RUnlock()
|
||||
values, ok := ms.dict[utils.SQStoredMetricsPrefix+sqID]
|
||||
values, ok := ms.dict[utils.StatQueuePrefix+sqID]
|
||||
if !ok {
|
||||
return nil, utils.ErrNotFound
|
||||
}
|
||||
err = ms.ms.Unmarshal(values, &sqSM)
|
||||
err = ms.ms.Unmarshal(values, &sq)
|
||||
return
|
||||
}
|
||||
|
||||
// SetStoredSQ stores the metrics for a StatsQueue
|
||||
func (ms *MapStorage) SetSQStoredMetrics(sqSM *SQStoredMetrics) (err error) {
|
||||
// SetStatQueue stores the metrics for a StatsQueue
|
||||
func (ms *MapStorage) SetStatQueue(sq *StatQueue) (err error) {
|
||||
ms.mu.Lock()
|
||||
defer ms.mu.Unlock()
|
||||
var result []byte
|
||||
result, err = ms.ms.Marshal(sqSM)
|
||||
result, err = ms.ms.Marshal(sq)
|
||||
if err != nil {
|
||||
return err
|
||||
}
|
||||
ms.dict[utils.SQStoredMetricsPrefix+sqSM.SqID] = result
|
||||
ms.dict[utils.StatQueuePrefix+sq.ID] = result
|
||||
return
|
||||
}
|
||||
|
||||
// RemSQStoredMetrics removes stored metrics for a StatsQueue
|
||||
func (ms *MapStorage) RemSQStoredMetrics(sqID string) (err error) {
|
||||
// RemStatQueue removes a StatsQueue
|
||||
func (ms *MapStorage) RemStatQueue(sqID string) (err error) {
|
||||
ms.mu.Lock()
|
||||
defer ms.mu.Unlock()
|
||||
delete(ms.dict, utils.SQStoredMetricsPrefix+sqID)
|
||||
delete(ms.dict, utils.StatQueuePrefix+sqID)
|
||||
return
|
||||
}
|
||||
|
||||
|
||||
@@ -61,6 +61,7 @@ const (
|
||||
colRFI = "request_filter_indexes"
|
||||
colTmg = "timings"
|
||||
colRes = "resources"
|
||||
colStQs = "statqueues"
|
||||
)
|
||||
|
||||
var (
|
||||
@@ -2082,11 +2083,11 @@ func (ms *MongoStorage) RemStatsConfig(sqID string) (err error) {
|
||||
return
|
||||
}
|
||||
|
||||
// GetSQStoredMetrics retrieves the stored metrics for a StatsQueue
|
||||
func (ms *MongoStorage) GetSQStoredMetrics(sqmID string) (sqSM *SQStoredMetrics, err error) {
|
||||
session, col := ms.conn(utils.SQStoredMetricsPrefix)
|
||||
// GetStatQueue retrieves a StatsQueue
|
||||
func (ms *MongoStorage) GetStatQueue(sqID string) (sq *StatQueue, err error) {
|
||||
session, col := ms.conn(colStQs)
|
||||
defer session.Close()
|
||||
if err = col.Find(bson.M{"sqid": sqmID}).One(&sqSM); err != nil {
|
||||
if err = col.Find(bson.M{"id": sqID}).One(&sq); err != nil {
|
||||
if err == mgo.ErrNotFound {
|
||||
err = utils.ErrNotFound
|
||||
}
|
||||
@@ -2096,16 +2097,16 @@ func (ms *MongoStorage) GetSQStoredMetrics(sqmID string) (sqSM *SQStoredMetrics,
|
||||
}
|
||||
|
||||
// SetStoredSQ stores the metrics for a StatsQueue
|
||||
func (ms *MongoStorage) SetSQStoredMetrics(sqSM *SQStoredMetrics) (err error) {
|
||||
session, col := ms.conn(utils.SQStoredMetricsPrefix)
|
||||
func (ms *MongoStorage) SetStatQueue(sq *StatQueue) (err error) {
|
||||
session, col := ms.conn(colStQs)
|
||||
defer session.Close()
|
||||
_, err = col.UpsertId(bson.M{"sqid": sqSM.SqID}, sqSM)
|
||||
_, err = col.Upsert(bson.M{"id": sq.ID}, sq)
|
||||
return
|
||||
}
|
||||
|
||||
// RemSQStoredMetrics removes stored metrics for a StatsQueue
|
||||
func (ms *MongoStorage) RemSQStoredMetrics(sqmID string) (err error) {
|
||||
session, col := ms.conn(utils.SQStoredMetricsPrefix)
|
||||
// RemStatQueue removes stored metrics for a StatsQueue
|
||||
func (ms *MongoStorage) RemStatQueue(sqmID string) (err error) {
|
||||
session, col := ms.conn(colStQs)
|
||||
defer session.Close()
|
||||
err = col.Remove(bson.M{"sqid": sqmID})
|
||||
return err
|
||||
|
||||
@@ -1628,9 +1628,9 @@ func (rs *RedisStorage) RemStatsConfig(sqID string) (err error) {
|
||||
return
|
||||
}
|
||||
|
||||
// GetSQStoredMetrics retrieves the stored metrics for a StatsQueue
|
||||
func (rs *RedisStorage) GetSQStoredMetrics(sqmID string) (sqSM *SQStoredMetrics, err error) {
|
||||
key := utils.SQStoredMetricsPrefix + sqmID
|
||||
// GetStatQueue retrieves the stored metrics for a StatsQueue
|
||||
func (rs *RedisStorage) GetStatQueue(sqID string) (sq *StatQueue, err error) {
|
||||
key := utils.StatQueuePrefix + sqID
|
||||
var values []byte
|
||||
if values, err = rs.Cmd("GET", key).Bytes(); err != nil {
|
||||
if err == redis.ErrRespNil {
|
||||
@@ -1638,25 +1638,25 @@ func (rs *RedisStorage) GetSQStoredMetrics(sqmID string) (sqSM *SQStoredMetrics,
|
||||
}
|
||||
return
|
||||
}
|
||||
if err = rs.ms.Unmarshal(values, &sqSM); err != nil {
|
||||
if err = rs.ms.Unmarshal(values, &sq); err != nil {
|
||||
return
|
||||
}
|
||||
return
|
||||
}
|
||||
|
||||
// SetStoredSQ stores the metrics for a StatsQueue
|
||||
func (rs *RedisStorage) SetSQStoredMetrics(sqSM *SQStoredMetrics) (err error) {
|
||||
// SetStatQueue stores the metrics for a StatsQueue
|
||||
func (rs *RedisStorage) SetStatQueue(sq *StatQueue) (err error) {
|
||||
var result []byte
|
||||
result, err = rs.ms.Marshal(sqSM)
|
||||
result, err = rs.ms.Marshal(sq)
|
||||
if err != nil {
|
||||
return
|
||||
}
|
||||
return rs.Cmd("SET", utils.SQStoredMetricsPrefix+sqSM.SqID, result).Err
|
||||
return rs.Cmd("SET", utils.StatQueuePrefix+sq.ID, result).Err
|
||||
}
|
||||
|
||||
// RemSQStoredMetrics removes stored metrics for a StatsQueue
|
||||
func (rs *RedisStorage) RemSQStoredMetrics(sqmID string) (err error) {
|
||||
key := utils.SQStoredMetricsPrefix + sqmID
|
||||
// RemStatQueue removes a StatsQueue
|
||||
func (rs *RedisStorage) RemStatQueue(sqID string) (err error) {
|
||||
key := utils.StatQueuePrefix + sqID
|
||||
if err = rs.Cmd("DEL", key).Err; err != nil {
|
||||
return
|
||||
}
|
||||
|
||||
63
stats/acd.go
63
stats/acd.go
@@ -1,63 +0,0 @@
|
||||
/*
|
||||
Real-time Online/Offline Charging System (OCS) for Telecom & ISP environments
|
||||
Copyright (C) ITsysCOM GmbH
|
||||
|
||||
This program is free software: you can redistribute it and/or modify
|
||||
it under the terms of the GNU General Public License as published by
|
||||
the Free Software Foundation, either version 3 of the License, or
|
||||
(at your option) any later version.
|
||||
|
||||
This program is distributed in the hope that it will be useful,
|
||||
but WITHOUT ANY WARRANTY; without even the implied warranty of
|
||||
MERCHANTABILITY or FITNESS FOR A PARTICULAR PURPOSE. See the
|
||||
GNU General Public License for more details.
|
||||
|
||||
You should have received a copy of the GNU General Public License
|
||||
along with this program. If not, see <http://www.gnu.org/licenses/>
|
||||
*/
|
||||
|
||||
package stats
|
||||
|
||||
import (
|
||||
"time"
|
||||
|
||||
"github.com/cgrates/cgrates/engine"
|
||||
)
|
||||
|
||||
func NewACD() (StatsMetric, error) {
|
||||
return new(ACD), nil
|
||||
}
|
||||
|
||||
// ACD implements AverageCallDuration metric
|
||||
type ACD struct {
|
||||
Sum time.Duration
|
||||
Count int
|
||||
}
|
||||
|
||||
func (acd *ACD) GetStringValue(fmtOpts string) (val string) {
|
||||
return
|
||||
}
|
||||
|
||||
func (acd *ACD) GetValue() (v interface{}) {
|
||||
return
|
||||
}
|
||||
|
||||
func (acd *ACD) GetFloat64Value() (v float64) {
|
||||
return float64(engine.STATS_NA)
|
||||
}
|
||||
|
||||
func (acd *ACD) AddEvent(ev engine.StatsEvent) (err error) {
|
||||
return
|
||||
}
|
||||
|
||||
func (acd *ACD) RemEvent(ev engine.StatsEvent) (err error) {
|
||||
return
|
||||
}
|
||||
|
||||
func (acd *ACD) GetMarshaled(ms engine.Marshaler) (vals []byte, err error) {
|
||||
return
|
||||
}
|
||||
|
||||
func (acd *ACD) SetFromMarshaled(vals []byte, ms engine.Marshaler) (err error) {
|
||||
return
|
||||
}
|
||||
87
stats/asr.go
87
stats/asr.go
@@ -1,87 +0,0 @@
|
||||
/*
|
||||
Real-time Online/Offline Charging System (OCS) for Telecom & ISP environments
|
||||
Copyright (C) ITsysCOM GmbH
|
||||
|
||||
This program is free software: you can redistribute it and/or modify
|
||||
it under the terms of the GNU General Public License as published by
|
||||
the Free Software Foundation, either version 3 of the License, or
|
||||
(at your option) any later version.
|
||||
|
||||
This program is distributed in the hope that it will be useful,
|
||||
but WITHOUT ANY WARRANTY; without even the implied warranty of
|
||||
MERCHANTABILITY or FITNESS FOR A PARTICULAR PURPOSE. See the
|
||||
GNU General Public License for more details.
|
||||
|
||||
You should have received a copy of the GNU General Public License
|
||||
along with this program. If not, see <http://www.gnu.org/licenses/>
|
||||
*/
|
||||
|
||||
package stats
|
||||
|
||||
import (
|
||||
"fmt"
|
||||
|
||||
"github.com/cgrates/cgrates/config"
|
||||
"github.com/cgrates/cgrates/engine"
|
||||
"github.com/cgrates/cgrates/utils"
|
||||
)
|
||||
|
||||
func NewASR() (StatsMetric, error) {
|
||||
return new(ASR), nil
|
||||
}
|
||||
|
||||
// ASR implements AverageSuccessRatio metric
|
||||
type ASR struct {
|
||||
Answered float64
|
||||
Count float64
|
||||
}
|
||||
|
||||
func (asr *ASR) GetValue() (v interface{}) {
|
||||
if asr.Count == 0 {
|
||||
return float64(engine.STATS_NA)
|
||||
}
|
||||
return utils.Round((asr.Answered / asr.Count * 100),
|
||||
config.CgrConfig().RoundingDecimals, utils.ROUNDING_MIDDLE)
|
||||
}
|
||||
|
||||
func (asr *ASR) GetStringValue(fmtOpts string) (valStr string) {
|
||||
if asr.Count == 0 {
|
||||
return utils.NOT_AVAILABLE
|
||||
}
|
||||
val := asr.GetValue().(float64)
|
||||
return fmt.Sprintf("%v%%", val) // %v will automatically limit the number of decimals printed
|
||||
}
|
||||
|
||||
func (asr *ASR) GetFloat64Value() (val float64) {
|
||||
return asr.GetValue().(float64)
|
||||
}
|
||||
|
||||
func (asr *ASR) AddEvent(ev engine.StatsEvent) (err error) {
|
||||
if at, err := ev.AnswerTime(config.CgrConfig().DefaultTimezone); err != nil &&
|
||||
err != utils.ErrNotFound {
|
||||
return err
|
||||
} else if !at.IsZero() {
|
||||
asr.Answered += 1
|
||||
}
|
||||
asr.Count += 1
|
||||
return
|
||||
}
|
||||
|
||||
func (asr *ASR) RemEvent(ev engine.StatsEvent) (err error) {
|
||||
if at, err := ev.AnswerTime(config.CgrConfig().DefaultTimezone); err != nil &&
|
||||
err != utils.ErrNotFound {
|
||||
return err
|
||||
} else if !at.IsZero() {
|
||||
asr.Answered -= 1
|
||||
}
|
||||
asr.Count -= 1
|
||||
return
|
||||
}
|
||||
|
||||
func (asr *ASR) GetMarshaled(ms engine.Marshaler) (vals []byte, err error) {
|
||||
return ms.Marshal(asr)
|
||||
}
|
||||
|
||||
func (asr *ASR) SetFromMarshaled(vals []byte, ms engine.Marshaler) (err error) {
|
||||
return ms.Unmarshal(vals, asr)
|
||||
}
|
||||
@@ -1,85 +0,0 @@
|
||||
/*
|
||||
Real-time Online/Offline Charging System (OCS) for Telecom & ISP environments
|
||||
Copyright (C) ITsysCOM GmbH
|
||||
|
||||
This program is free software: you can redistribute it and/or modify
|
||||
it under the terms of the GNU General Public License as published by
|
||||
the Free Software Foundation, either version 3 of the License, or
|
||||
(at your option) any later version.
|
||||
|
||||
This program is distributed in the hope that it will be useful,
|
||||
but WITHOUT ANY WARRANTY; without even the implied warranty of
|
||||
MERCHANTABILITY or FITNESS FOR A PARTICULAR PURPOSE. See the
|
||||
GNU General Public License for more details.
|
||||
|
||||
You should have received a copy of the GNU General Public License
|
||||
along with this program. If not, see <http://www.gnu.org/licenses/>
|
||||
*/
|
||||
package stats
|
||||
|
||||
import (
|
||||
"testing"
|
||||
"time"
|
||||
|
||||
"github.com/cgrates/cgrates/engine"
|
||||
"github.com/cgrates/cgrates/utils"
|
||||
)
|
||||
|
||||
func TestASRGetStringValue(t *testing.T) {
|
||||
asr, _ := NewASR()
|
||||
if strVal := asr.GetStringValue(""); strVal != utils.NOT_AVAILABLE {
|
||||
t.Errorf("wrong asr value: %s", strVal)
|
||||
}
|
||||
ev := engine.StatsEvent{
|
||||
"AnswerTime": time.Date(2014, 7, 14, 14, 25, 0, 0, time.UTC)}
|
||||
asr.AddEvent(ev)
|
||||
if strVal := asr.GetStringValue(""); strVal != "100%" {
|
||||
t.Errorf("wrong asr value: %s", strVal)
|
||||
}
|
||||
asr.AddEvent(engine.StatsEvent{})
|
||||
asr.AddEvent(engine.StatsEvent{})
|
||||
if strVal := asr.GetStringValue(""); strVal != "33.33333%" {
|
||||
t.Errorf("wrong asr value: %s", strVal)
|
||||
}
|
||||
asr.RemEvent(engine.StatsEvent{})
|
||||
if strVal := asr.GetStringValue(""); strVal != "50%" {
|
||||
t.Errorf("wrong asr value: %s", strVal)
|
||||
}
|
||||
asr.RemEvent(ev)
|
||||
if strVal := asr.GetStringValue(""); strVal != "0%" {
|
||||
t.Errorf("wrong asr value: %s", strVal)
|
||||
}
|
||||
asr.RemEvent(engine.StatsEvent{})
|
||||
if strVal := asr.GetStringValue(""); strVal != utils.NOT_AVAILABLE {
|
||||
t.Errorf("wrong asr value: %s", strVal)
|
||||
}
|
||||
|
||||
}
|
||||
|
||||
func TestASRGetValue(t *testing.T) {
|
||||
asr, _ := NewASR()
|
||||
ev := engine.StatsEvent{
|
||||
"AnswerTime": time.Date(2014, 7, 14, 14, 25, 0, 0, time.UTC),
|
||||
}
|
||||
asr.AddEvent(ev)
|
||||
if v := asr.GetValue(); v != 100.0 {
|
||||
t.Errorf("wrong asr value: %f", v)
|
||||
}
|
||||
asr.AddEvent(engine.StatsEvent{})
|
||||
asr.AddEvent(engine.StatsEvent{})
|
||||
if v := asr.GetValue(); v != 33.33333 {
|
||||
t.Errorf("wrong asr value: %f", v)
|
||||
}
|
||||
asr.RemEvent(engine.StatsEvent{})
|
||||
if v := asr.GetValue(); v != 50.0 {
|
||||
t.Errorf("wrong asr value: %f", v)
|
||||
}
|
||||
asr.RemEvent(ev)
|
||||
if v := asr.GetValue(); v != 0.0 {
|
||||
t.Errorf("wrong asr value: %f", v)
|
||||
}
|
||||
asr.RemEvent(engine.StatsEvent{})
|
||||
if v := asr.GetValue(); v != -1.0 {
|
||||
t.Errorf("wrong asr value: %f", v)
|
||||
}
|
||||
}
|
||||
@@ -1,72 +0,0 @@
|
||||
/*
|
||||
Real-time Online/Offline Charging System (OCS) for Telecom & ISP environments
|
||||
Copyright (C) ITsysCOM GmbH
|
||||
|
||||
This program is free software: you can redistribute it and/or modify
|
||||
it under the terms of the GNU General Public License as published by
|
||||
the Free Software Foundation, either version 3 of the License, or
|
||||
(at your option) any later version.
|
||||
|
||||
This program is distributed in the hope that it will be useful,
|
||||
but WITHOUT ANY WARRANTY; without even the implied warranty of
|
||||
MERCHANTABILITY or FITNESS FOR A PARTICULAR PURPOSE. See the
|
||||
GNU General Public License for more details.
|
||||
|
||||
You should have received a copy of the GNU General Public License
|
||||
along with this program. If not, see <http://www.gnu.org/licenses/>
|
||||
*/
|
||||
package stats
|
||||
|
||||
import (
|
||||
"sync"
|
||||
|
||||
"github.com/cgrates/cgrates/engine"
|
||||
"github.com/cgrates/cgrates/utils"
|
||||
)
|
||||
|
||||
// NewStatsEventCache instantiates a StatsEventCache
|
||||
func NewStatsEventCache() *StatsEventCache {
|
||||
return &StatsEventCache{
|
||||
evCacheIdx: make(map[string]utils.StringMap),
|
||||
evCache: make(map[string]engine.StatsEvent)}
|
||||
}
|
||||
|
||||
// StatsEventCache keeps a cache of StatsEvents which are referenced by StatsQueues
|
||||
type StatsEventCache struct {
|
||||
sync.RWMutex
|
||||
evCacheIdx map[string]utils.StringMap // index events used in queues, map[eventID]map[queueID]bool
|
||||
evCache map[string]engine.StatsEvent // cache for the processed events
|
||||
}
|
||||
|
||||
// Cache will cache an event and reference it in the index
|
||||
func (sec *StatsEventCache) Cache(evID string, ev engine.StatsEvent, queueID string) {
|
||||
if utils.IsSliceMember([]string{evID, queueID}, "") {
|
||||
return
|
||||
}
|
||||
sec.Lock()
|
||||
if _, hasIt := sec.evCache[evID]; !hasIt {
|
||||
sec.evCache[evID] = ev
|
||||
}
|
||||
sec.evCacheIdx[evID][queueID] = true
|
||||
sec.Unlock()
|
||||
}
|
||||
|
||||
func (sec *StatsEventCache) UnCache(evID string, ev engine.StatsEvent, queueID string) {
|
||||
sec.Lock()
|
||||
if _, hasIt := sec.evCache[evID]; !hasIt {
|
||||
return
|
||||
}
|
||||
delete(sec.evCacheIdx[evID], queueID)
|
||||
if len(sec.evCacheIdx[evID]) == 0 {
|
||||
delete(sec.evCacheIdx, evID)
|
||||
delete(sec.evCache, evID)
|
||||
}
|
||||
sec.Unlock()
|
||||
}
|
||||
|
||||
// GetEvent returns the event based on ID
|
||||
func (sec *StatsEventCache) GetEvent(evID string) engine.StatsEvent {
|
||||
sec.RLock()
|
||||
defer sec.RUnlock()
|
||||
return sec.evCache[evID]
|
||||
}
|
||||
@@ -1,50 +0,0 @@
|
||||
/*
|
||||
Real-time Online/Offline Charging System (OCS) for Telecom & ISP environments
|
||||
Copyright (C) ITsysCOM GmbH
|
||||
|
||||
This program is free software: you can redistribute it and/or modify
|
||||
it under the terms of the GNU General Public License as published by
|
||||
the Free Software Foundation, either version 3 of the License, or
|
||||
(at your option) any later version.
|
||||
|
||||
This program is distributed in the hope that it will be useful,
|
||||
but WITHOUT ANY WARRANTY; without even the implied warranty of
|
||||
MERCHANTABILITY or FITNESS FOR A PARTICULAR PURPOSE. See the
|
||||
GNU General Public License for more details.
|
||||
|
||||
You should have received a copy of the GNU General Public License
|
||||
along with this program. If not, see <http://www.gnu.org/licenses/>
|
||||
*/
|
||||
|
||||
package stats
|
||||
|
||||
import (
|
||||
"fmt"
|
||||
|
||||
"github.com/cgrates/cgrates/engine"
|
||||
"github.com/cgrates/cgrates/utils"
|
||||
)
|
||||
|
||||
// NewStatsMetrics instantiates the StatsMetrics
|
||||
// cfg serves as general purpose container to pass config options to metric
|
||||
func NewStatsMetric(metricID string) (sm StatsMetric, err error) {
|
||||
metrics := map[string]func() (StatsMetric, error){
|
||||
utils.MetaASR: NewASR,
|
||||
utils.MetaACD: NewACD,
|
||||
}
|
||||
if _, has := metrics[metricID]; !has {
|
||||
return nil, fmt.Errorf("unsupported metric: %s", metricID)
|
||||
}
|
||||
return metrics[metricID]()
|
||||
}
|
||||
|
||||
// StatsMetric is the interface which a metric should implement
|
||||
type StatsMetric interface {
|
||||
GetValue() interface{}
|
||||
GetStringValue(fmtOpts string) (val string)
|
||||
GetFloat64Value() (val float64)
|
||||
AddEvent(ev engine.StatsEvent) error
|
||||
RemEvent(ev engine.StatsEvent) error
|
||||
GetMarshaled(ms engine.Marshaler) (vals []byte, err error)
|
||||
SetFromMarshaled(vals []byte, ms engine.Marshaler) (err error) // mostly used to load from DB
|
||||
}
|
||||
189
stats/queue.go
189
stats/queue.go
@@ -1,189 +0,0 @@
|
||||
/*
|
||||
Real-time Online/Offline Charging System (OCS) for Telecom & ISP environments
|
||||
Copyright (C) ITsysCOM GmbH
|
||||
|
||||
This program is free software: you can redistribute it and/or modify
|
||||
it under the terms of the GNU General Public License as published by
|
||||
the Free Software Foundation, either version 3 of the License, or
|
||||
(at your option) any later version.
|
||||
|
||||
This program is distributed in the hope that it will be useful,
|
||||
but WITHOUT ANY WARRANTY; without even the implied warranty of
|
||||
MERCHANTABILITY or FITNESS FOR A PARTICULAR PURPOSE. See the
|
||||
GNU General Public License for more details.
|
||||
|
||||
You should have received a copy of the GNU General Public License
|
||||
along with this program. If not, see <http://www.gnu.org/licenses/>
|
||||
*/
|
||||
package stats
|
||||
|
||||
import (
|
||||
"fmt"
|
||||
"sort"
|
||||
"sync"
|
||||
"time"
|
||||
|
||||
"github.com/cgrates/cgrates/engine"
|
||||
"github.com/cgrates/cgrates/utils"
|
||||
)
|
||||
|
||||
// StatQueues is a sortable list of StatQueue
|
||||
type StatQueues []*StatQueue
|
||||
|
||||
// Sort is part of sort interface, sort based on Weight
|
||||
func (sis StatQueues) Sort() {
|
||||
sort.Slice(sis, func(i, j int) bool { return sis[i].cfg.Weight > sis[j].cfg.Weight })
|
||||
}
|
||||
|
||||
// remWithID removes the queue with ID from slice
|
||||
func (sis StatQueues) remWithID(qID string) {
|
||||
for i, q := range sis {
|
||||
if q.cfg.ID == qID {
|
||||
copy(sis[i:], sis[i+1:])
|
||||
sis[len(sis)-1] = nil
|
||||
sis = sis[:len(sis)-1]
|
||||
break // there can be only one item with ID
|
||||
}
|
||||
}
|
||||
}
|
||||
|
||||
// NewStatQueue instantiates a StatQueue
|
||||
func NewStatQueue(sec *StatsEventCache, ms engine.Marshaler,
|
||||
sqCfg *engine.StatsConfig, sqSM *engine.SQStoredMetrics) (si *StatQueue, err error) {
|
||||
si = &StatQueue{sec: sec, ms: ms, cfg: sqCfg, sqMetrics: make(map[string]StatsMetric)}
|
||||
for _, metricID := range sqCfg.Metrics {
|
||||
if si.sqMetrics[metricID], err = NewStatsMetric(metricID); err != nil {
|
||||
return
|
||||
}
|
||||
}
|
||||
if sqSM != nil {
|
||||
for evID, ev := range sqSM.SEvents {
|
||||
si.sec.Cache(evID, ev, si.cfg.ID)
|
||||
}
|
||||
si.sqItems = sqSM.SQItems
|
||||
for metricID := range si.sqMetrics {
|
||||
if _, has := si.sqMetrics[metricID]; !has {
|
||||
if si.sqMetrics[metricID], err = NewStatsMetric(metricID); err != nil {
|
||||
return
|
||||
}
|
||||
}
|
||||
if stored, has := sqSM.SQMetrics[metricID]; !has {
|
||||
continue
|
||||
} else if err = si.sqMetrics[metricID].SetFromMarshaled(stored, ms); err != nil {
|
||||
return
|
||||
}
|
||||
}
|
||||
}
|
||||
return
|
||||
}
|
||||
|
||||
// StatQueue represents an individual stats instance
|
||||
type StatQueue struct {
|
||||
sync.RWMutex
|
||||
dirty bool // needs save
|
||||
sec *StatsEventCache
|
||||
sqItems []*engine.SQItem
|
||||
sqMetrics map[string]StatsMetric
|
||||
ms engine.Marshaler // used to get/set Metrics
|
||||
cfg *engine.StatsConfig
|
||||
}
|
||||
|
||||
// GetSQStoredMetrics retrieves the data used for store to DB
|
||||
func (sq *StatQueue) GetStoredMetrics() (sqSM *engine.SQStoredMetrics) {
|
||||
sq.RLock()
|
||||
defer sq.RUnlock()
|
||||
sEvents := make(map[string]engine.StatsEvent)
|
||||
var sItems []*engine.SQItem
|
||||
for _, sqItem := range sq.sqItems { // make sure event is properly retrieved from cache
|
||||
ev := sq.sec.GetEvent(sqItem.EventID)
|
||||
if ev == nil {
|
||||
utils.Logger.Warning(fmt.Sprintf("<StatQueue> querying for storage eventID: %s, error: event not cached",
|
||||
sqItem.EventID))
|
||||
continue
|
||||
}
|
||||
sEvents[sqItem.EventID] = ev
|
||||
sItems = append(sItems, sqItem)
|
||||
}
|
||||
sqSM = &engine.SQStoredMetrics{
|
||||
SEvents: sEvents,
|
||||
SQItems: sItems,
|
||||
SQMetrics: make(map[string][]byte, len(sq.sqMetrics))}
|
||||
for metricID, metric := range sq.sqMetrics {
|
||||
var err error
|
||||
if sqSM.SQMetrics[metricID], err = metric.GetMarshaled(sq.ms); err != nil {
|
||||
utils.Logger.Warning(fmt.Sprintf("<StatQueue> querying for storage metricID: %s, error: %s",
|
||||
metricID, err.Error()))
|
||||
continue
|
||||
}
|
||||
}
|
||||
return
|
||||
}
|
||||
|
||||
// ProcessEvent processes a StatsEvent, returns true if processed
|
||||
func (sq *StatQueue) ProcessEvent(ev engine.StatsEvent) (err error) {
|
||||
sq.Lock()
|
||||
sq.remExpired()
|
||||
sq.remOnQueueLength()
|
||||
sq.addStatsEvent(ev)
|
||||
sq.Unlock()
|
||||
return
|
||||
}
|
||||
|
||||
// remExpired expires items in queue
|
||||
func (sq *StatQueue) remExpired() {
|
||||
var expIdx *int // index of last item to be expired
|
||||
for i, item := range sq.sqItems {
|
||||
if item.ExpiryTime == nil {
|
||||
break
|
||||
}
|
||||
if item.ExpiryTime.After(time.Now()) {
|
||||
break
|
||||
}
|
||||
sq.remEventWithID(item.EventID)
|
||||
item = nil // garbage collected asap
|
||||
expIdx = &i
|
||||
}
|
||||
if expIdx == nil {
|
||||
return
|
||||
}
|
||||
nextValidIdx := *expIdx + 1
|
||||
sq.sqItems = sq.sqItems[nextValidIdx:]
|
||||
}
|
||||
|
||||
// remOnQueueLength rems elements based on QueueLength setting
|
||||
func (sq *StatQueue) remOnQueueLength() {
|
||||
if sq.cfg.QueueLength == 0 {
|
||||
return
|
||||
}
|
||||
if len(sq.sqItems) == sq.cfg.QueueLength { // reached limit, rem first element
|
||||
itm := sq.sqItems[0]
|
||||
sq.remEventWithID(itm.EventID)
|
||||
itm = nil
|
||||
sq.sqItems = sq.sqItems[1:]
|
||||
}
|
||||
}
|
||||
|
||||
// addStatsEvent computes metrics for an event
|
||||
func (sq *StatQueue) addStatsEvent(ev engine.StatsEvent) {
|
||||
evID := ev.ID()
|
||||
for metricID, metric := range sq.sqMetrics {
|
||||
if err := metric.AddEvent(ev); err != nil {
|
||||
utils.Logger.Warning(fmt.Sprintf("<StatQueue> metricID: %s, add eventID: %s, error: %s",
|
||||
metricID, evID, err.Error()))
|
||||
}
|
||||
}
|
||||
}
|
||||
|
||||
// remStatsEvent removes an event from metrics
|
||||
func (sq *StatQueue) remEventWithID(evID string) {
|
||||
ev := sq.sec.GetEvent(evID)
|
||||
if ev == nil {
|
||||
utils.Logger.Warning(fmt.Sprintf("<StatQueue> removing eventID: %s, error: event not cached", evID))
|
||||
return
|
||||
}
|
||||
for metricID, metric := range sq.sqMetrics {
|
||||
if err := metric.RemEvent(ev); err != nil {
|
||||
utils.Logger.Warning(fmt.Sprintf("<StatQueue> metricID: %s, remove eventID: %s, error: %s", metricID, evID, err.Error()))
|
||||
}
|
||||
}
|
||||
}
|
||||
@@ -1,44 +0,0 @@
|
||||
/*
|
||||
Real-time Online/Offline Charging System (OCS) for Telecom & ISP environments
|
||||
Copyright (C) ITsysCOM GmbH
|
||||
|
||||
This program is free software: you can redistribute it and/or modify
|
||||
it under the terms of the GNU General Public License as published by
|
||||
the Free Software Foundation, either version 3 of the License, or
|
||||
(at your option) any later version.
|
||||
|
||||
This program is distributed in the hope that it will be useful,
|
||||
but WITHOUT ANY WARRANTY; without even the implied warranty of
|
||||
MERCHANTABILITY or FITNESS FOR A PARTICULAR PURPOSE. See the
|
||||
GNU General Public License for more details.
|
||||
|
||||
You should have received a copy of the GNU General Public License
|
||||
along with this program. If not, see <http://www.gnu.org/licenses/>
|
||||
*/
|
||||
package stats
|
||||
|
||||
import (
|
||||
"reflect"
|
||||
"testing"
|
||||
|
||||
"github.com/cgrates/cgrates/engine"
|
||||
)
|
||||
|
||||
func TestStatQueuesSort(t *testing.T) {
|
||||
sInsts := StatQueues{
|
||||
&StatQueue{cfg: &engine.StatsConfig{ID: "FIRST", Weight: 30.0}},
|
||||
&StatQueue{cfg: &engine.StatsConfig{ID: "SECOND", Weight: 40.0}},
|
||||
&StatQueue{cfg: &engine.StatsConfig{ID: "THIRD", Weight: 30.0}},
|
||||
&StatQueue{cfg: &engine.StatsConfig{ID: "FOURTH", Weight: 35.0}},
|
||||
}
|
||||
sInsts.Sort()
|
||||
eSInst := StatQueues{
|
||||
&StatQueue{cfg: &engine.StatsConfig{ID: "SECOND", Weight: 40.0}},
|
||||
&StatQueue{cfg: &engine.StatsConfig{ID: "FOURTH", Weight: 35.0}},
|
||||
&StatQueue{cfg: &engine.StatsConfig{ID: "FIRST", Weight: 30.0}},
|
||||
&StatQueue{cfg: &engine.StatsConfig{ID: "THIRD", Weight: 30.0}},
|
||||
}
|
||||
if !reflect.DeepEqual(eSInst, sInsts) {
|
||||
t.Errorf("expecting: %+v, received: %+v", eSInst, sInsts)
|
||||
}
|
||||
}
|
||||
290
stats/service.go
290
stats/service.go
@@ -1,290 +0,0 @@
|
||||
/*
|
||||
Real-time Online/Offline Charging System (OCS) for Telecom & ISP environments
|
||||
Copyright (C) ITsysCOM GmbH
|
||||
|
||||
This program is free software: you can redistribute it and/or modify
|
||||
it under the terms of the GNU General Public License as published by
|
||||
the Free Software Foundation, either version 3 of the License, or
|
||||
(at your option) any later version.
|
||||
|
||||
This program is distributed in the hope that it will be useful,
|
||||
but WITHOUT ANY WARRANTY; without even the implied warranty of
|
||||
MERCHANTABILITY or FITNESS FOR A PARTICULAR PURPOSE. See the
|
||||
GNU General Public License for more details.
|
||||
|
||||
You should have received a copy of the GNU General Public License
|
||||
along with this program. If not, see <http://www.gnu.org/licenses/>
|
||||
*/
|
||||
package stats
|
||||
|
||||
import (
|
||||
"errors"
|
||||
"fmt"
|
||||
"math/rand"
|
||||
"reflect"
|
||||
"strings"
|
||||
"sync"
|
||||
"time"
|
||||
|
||||
"github.com/cgrates/cgrates/engine"
|
||||
"github.com/cgrates/cgrates/utils"
|
||||
"github.com/cgrates/rpcclient"
|
||||
)
|
||||
|
||||
func init() {
|
||||
rand.Seed(time.Now().UnixNano())
|
||||
}
|
||||
|
||||
// NewStatService initializes a StatService
|
||||
func NewStatService(dataDB engine.DataDB, ms engine.Marshaler, storeInterval time.Duration) (ss *StatService, err error) {
|
||||
ss = &StatService{dataDB: dataDB, ms: ms, storeInterval: storeInterval,
|
||||
stopStoring: make(chan struct{}), evCache: NewStatsEventCache()}
|
||||
sqPrfxs, err := dataDB.GetKeysForPrefix(utils.StatsConfigPrefix)
|
||||
if err != nil {
|
||||
return nil, err
|
||||
}
|
||||
ss.queuesCache = make(map[string]*StatQueue)
|
||||
ss.queues = make(StatQueues, 0)
|
||||
for _, prfx := range sqPrfxs {
|
||||
if q, err := ss.loadQueue(prfx[len(utils.StatsConfigPrefix):]); err != nil {
|
||||
utils.Logger.Err(fmt.Sprintf("<StatS> failed loading quueue with id: <%s>, err: <%s>",
|
||||
q.cfg.ID, err.Error()))
|
||||
continue
|
||||
} else {
|
||||
ss.setQueue(q)
|
||||
}
|
||||
}
|
||||
ss.queues.Sort()
|
||||
go ss.dumpStoredMetrics() // start dumpStoredMetrics loop
|
||||
return
|
||||
}
|
||||
|
||||
// StatService builds stats for events
|
||||
type StatService struct {
|
||||
sync.RWMutex
|
||||
dataDB engine.DataDB
|
||||
ms engine.Marshaler
|
||||
storeInterval time.Duration
|
||||
stopStoring chan struct{}
|
||||
evCache *StatsEventCache // so we can pass it to queues
|
||||
queuesCache map[string]*StatQueue // unordered db of StatQueues, used for fast queries
|
||||
queues StatQueues // ordered list of StatQueues
|
||||
|
||||
}
|
||||
|
||||
// ListenAndServe loops keeps the service alive
|
||||
func (ss *StatService) ListenAndServe(exitChan chan bool) error {
|
||||
e := <-exitChan
|
||||
exitChan <- e // put back for the others listening for shutdown request
|
||||
return nil
|
||||
}
|
||||
|
||||
// Called to shutdown the service
|
||||
// ToDo: improve with context, ie following http implementation
|
||||
func (ss *StatService) Shutdown() error {
|
||||
utils.Logger.Info("<StatS> service shutdown initialized")
|
||||
close(ss.stopStoring)
|
||||
ss.storeMetrics()
|
||||
utils.Logger.Info("<StatS> service shutdown complete")
|
||||
return nil
|
||||
}
|
||||
|
||||
// setQueue adds or modifies a queue into cache
|
||||
// sort will reorder the ss.queues
|
||||
func (ss *StatService) loadQueue(qID string) (q *StatQueue, err error) {
|
||||
sq, err := ss.dataDB.GetStatsConfig(qID)
|
||||
if err != nil {
|
||||
return nil, err
|
||||
}
|
||||
var sqSM *engine.SQStoredMetrics
|
||||
if sq.Store {
|
||||
if sqSM, err = ss.dataDB.GetSQStoredMetrics(sq.ID); err != nil && err != utils.ErrNotFound {
|
||||
return nil, err
|
||||
}
|
||||
}
|
||||
return NewStatQueue(ss.evCache, ss.ms, sq, sqSM)
|
||||
}
|
||||
|
||||
func (ss *StatService) setQueue(q *StatQueue) {
|
||||
ss.queuesCache[q.cfg.ID] = q
|
||||
ss.queues = append(ss.queues, q)
|
||||
}
|
||||
|
||||
// remQueue will remove a queue based on it's ID
|
||||
func (ss *StatService) remQueue(qID string) (si *StatQueue) {
|
||||
si = ss.queuesCache[qID]
|
||||
ss.queues.remWithID(qID)
|
||||
delete(ss.queuesCache, qID)
|
||||
return
|
||||
}
|
||||
|
||||
// store stores the necessary storedMetrics to dataDB
|
||||
func (ss *StatService) storeMetrics() {
|
||||
for _, si := range ss.queues {
|
||||
if !si.cfg.Store || !si.dirty { // no need to save
|
||||
continue
|
||||
}
|
||||
if siSM := si.GetStoredMetrics(); siSM != nil {
|
||||
if err := ss.dataDB.SetSQStoredMetrics(siSM); err != nil {
|
||||
utils.Logger.Warning(
|
||||
fmt.Sprintf("<StatService> failed saving StoredMetrics for QueueID: %s, error: %s",
|
||||
si.cfg.ID, err.Error()))
|
||||
}
|
||||
}
|
||||
// randomize the CPU load and give up thread control
|
||||
time.Sleep(time.Duration(rand.Intn(1000)) * time.Nanosecond)
|
||||
}
|
||||
return
|
||||
}
|
||||
|
||||
// dumpStoredMetrics regularly dumps metrics to dataDB
|
||||
func (ss *StatService) dumpStoredMetrics() {
|
||||
for {
|
||||
select {
|
||||
case <-ss.stopStoring:
|
||||
return
|
||||
}
|
||||
ss.storeMetrics()
|
||||
time.Sleep(ss.storeInterval)
|
||||
}
|
||||
}
|
||||
|
||||
// processEvent processes a StatsEvent through the queues and caches it when needed
|
||||
func (ss *StatService) processEvent(ev engine.StatsEvent) (err error) {
|
||||
evStatsID := ev.ID()
|
||||
if evStatsID == "" { // ID is mandatory
|
||||
return errors.New("missing ID field")
|
||||
}
|
||||
for _, stInst := range ss.queues {
|
||||
if err := stInst.ProcessEvent(ev); err != nil {
|
||||
utils.Logger.Warning(
|
||||
fmt.Sprintf("<StatService> QueueID: %s, ignoring event with ID: %s, error: %s",
|
||||
stInst.cfg.ID, evStatsID, err.Error()))
|
||||
}
|
||||
if stInst.cfg.Blocker {
|
||||
break
|
||||
}
|
||||
}
|
||||
return
|
||||
}
|
||||
|
||||
// V1ProcessEvent implements StatV1 method for processing an Event
|
||||
func (ss *StatService) V1ProcessEvent(ev engine.StatsEvent, reply *string) (err error) {
|
||||
if err = ss.processEvent(ev); err == nil {
|
||||
*reply = utils.OK
|
||||
}
|
||||
return
|
||||
}
|
||||
|
||||
// V1GetQueueIDs returns list of queue IDs configured in the service
|
||||
func (ss *StatService) V1GetQueueIDs(ignored struct{}, reply *[]string) (err error) {
|
||||
if len(ss.queuesCache) == 0 {
|
||||
return utils.ErrNotFound
|
||||
}
|
||||
for k := range ss.queuesCache {
|
||||
*reply = append(*reply, k)
|
||||
}
|
||||
return
|
||||
}
|
||||
|
||||
// V1GetStringMetrics returns the metrics as string values
|
||||
func (ss *StatService) V1GetStringMetrics(queueID string, reply *map[string]string) (err error) {
|
||||
sq, has := ss.queuesCache[queueID]
|
||||
if !has {
|
||||
return utils.ErrNotFound
|
||||
}
|
||||
metrics := make(map[string]string, len(sq.sqMetrics))
|
||||
for metricID, metric := range sq.sqMetrics {
|
||||
metrics[metricID] = metric.GetStringValue("")
|
||||
}
|
||||
*reply = metrics
|
||||
return
|
||||
}
|
||||
|
||||
// V1GetFloatMetrics returns the metrics as float64 values
|
||||
func (ss *StatService) V1GetFloatMetrics(queueID string, reply *map[string]float64) (err error) {
|
||||
sq, has := ss.queuesCache[queueID]
|
||||
if !has {
|
||||
return utils.ErrNotFound
|
||||
}
|
||||
metrics := make(map[string]float64, len(sq.sqMetrics))
|
||||
for metricID, metric := range sq.sqMetrics {
|
||||
metrics[metricID] = metric.GetFloat64Value()
|
||||
}
|
||||
*reply = metrics
|
||||
return
|
||||
}
|
||||
|
||||
// ArgsLoadQueues are the arguments passed to V1LoadQueues
|
||||
type ArgsLoadQueues struct {
|
||||
QueueIDs *[]string
|
||||
}
|
||||
|
||||
// V1LoadQueues loads the queues specified by qIDs into the service
|
||||
// loads all if args.QueueIDs is nil
|
||||
func (ss *StatService) V1LoadQueues(args ArgsLoadQueues, reply *string) (err error) {
|
||||
qIDs := args.QueueIDs
|
||||
if qIDs == nil {
|
||||
sqPrfxs, err := ss.dataDB.GetKeysForPrefix(utils.StatsConfigPrefix)
|
||||
if err != nil {
|
||||
return err
|
||||
}
|
||||
queueIDs := make([]string, len(sqPrfxs))
|
||||
for i, prfx := range sqPrfxs {
|
||||
queueIDs[i] = prfx[len(utils.StatsConfigPrefix):]
|
||||
}
|
||||
if len(queueIDs) != 0 {
|
||||
qIDs = &queueIDs
|
||||
}
|
||||
}
|
||||
if qIDs == nil || len(*qIDs) == 0 {
|
||||
return utils.ErrNotFound
|
||||
}
|
||||
var sQs []*StatQueue // cache here so we lock only later when data available
|
||||
for _, qID := range *qIDs {
|
||||
if _, hasPrev := ss.queuesCache[qID]; hasPrev {
|
||||
continue // don't overwrite previous, could be extended in the future by carefully checking cached events
|
||||
}
|
||||
if q, err := ss.loadQueue(qID); err != nil {
|
||||
utils.Logger.Err(fmt.Sprintf("<StatS> failed loading quueue with id: <%s>, err: <%s>",
|
||||
q.cfg.ID, err.Error()))
|
||||
continue
|
||||
} else {
|
||||
sQs = append(sQs, q)
|
||||
}
|
||||
}
|
||||
ss.Lock()
|
||||
for _, q := range sQs {
|
||||
ss.setQueue(q)
|
||||
}
|
||||
ss.queues.Sort()
|
||||
ss.Unlock()
|
||||
*reply = utils.OK
|
||||
return
|
||||
}
|
||||
|
||||
// Call implements rpcclient.RpcClientConnection interface for internal RPC
|
||||
// here for testing purposes
|
||||
func (ss *StatService) Call(serviceMethod string, args interface{}, reply interface{}) error {
|
||||
methodSplit := strings.Split(serviceMethod, ".")
|
||||
if len(methodSplit) != 2 {
|
||||
return rpcclient.ErrUnsupporteServiceMethod
|
||||
}
|
||||
method := reflect.ValueOf(ss).MethodByName(methodSplit[0][len(methodSplit[0])-2:] + methodSplit[1])
|
||||
if !method.IsValid() {
|
||||
return rpcclient.ErrUnsupporteServiceMethod
|
||||
}
|
||||
params := []reflect.Value{reflect.ValueOf(args), reflect.ValueOf(reply)}
|
||||
ret := method.Call(params)
|
||||
if len(ret) != 1 {
|
||||
return utils.ErrServerError
|
||||
}
|
||||
if ret[0].Interface() == nil {
|
||||
return nil
|
||||
}
|
||||
err, ok := ret[0].Interface().(error)
|
||||
if !ok {
|
||||
return utils.ErrServerError
|
||||
}
|
||||
return err
|
||||
}
|
||||
@@ -1,77 +0,0 @@
|
||||
/*
|
||||
Real-time Online/Offline Charging System (OCS) for Telecom & ISP environments
|
||||
Copyright (C) ITsysCOM GmbH
|
||||
|
||||
This program is free software: you can redistribute it and/or modify
|
||||
it under the terms of the GNU General Public License as published by
|
||||
the Free Software Foundation, either version 3 of the License, or
|
||||
(at your option) any later version.
|
||||
|
||||
This program is distributed in the hope that it will be useful,
|
||||
but WITHOUT ANY WARRANTY; without even the implied warranty of
|
||||
MERCHANTABILITY or FITNESS FOR A PARTICULAR PURPOSE. See the
|
||||
GNU General Public License for more details.
|
||||
|
||||
You should have received a copy of the GNU General Public License
|
||||
along with this program. If not, see <http://www.gnu.org/licenses/>
|
||||
*/
|
||||
package stats
|
||||
|
||||
import (
|
||||
"testing"
|
||||
"time"
|
||||
|
||||
"github.com/cgrates/cgrates/config"
|
||||
"github.com/cgrates/cgrates/engine"
|
||||
"github.com/cgrates/cgrates/utils"
|
||||
)
|
||||
|
||||
func TestReqFilterPassStatS(t *testing.T) {
|
||||
if cgrCfg := config.CgrConfig(); cgrCfg == nil {
|
||||
cgrCfg, _ = config.NewDefaultCGRConfig()
|
||||
config.SetCgrConfig(cgrCfg)
|
||||
}
|
||||
dataStorage, _ := engine.NewMapStorage()
|
||||
dataStorage.SetStatsConfig(
|
||||
&engine.StatsConfig{ID: "CDRST1",
|
||||
Filters: []*engine.RequestFilter{
|
||||
&engine.RequestFilter{Type: engine.MetaString, FieldName: "Tenant",
|
||||
Values: []string{"cgrates.org"}}},
|
||||
Metrics: []string{utils.MetaASR}})
|
||||
statS, err := NewStatService(dataStorage, dataStorage.Marshaler(), 0)
|
||||
if err != nil {
|
||||
t.Fatal(err)
|
||||
}
|
||||
var replyStr string
|
||||
if err := statS.Call("StatSV1.LoadQueues", ArgsLoadQueues{},
|
||||
&replyStr); err != nil {
|
||||
t.Error(err)
|
||||
} else if replyStr != utils.OK {
|
||||
t.Errorf("reply received: %s", replyStr)
|
||||
}
|
||||
cdr := &engine.CDR{
|
||||
Tenant: "cgrates.org",
|
||||
Category: "call",
|
||||
AnswerTime: time.Now(),
|
||||
SetupTime: time.Now(),
|
||||
Usage: 10 * time.Second,
|
||||
Cost: 10,
|
||||
Supplier: "suppl1",
|
||||
DisconnectCause: "NORMAL_CLEARNING",
|
||||
}
|
||||
cdrMp, _ := cdr.AsMapStringIface()
|
||||
cdrMp[utils.ID] = "event1"
|
||||
if err := statS.processEvent(cdrMp); err != nil {
|
||||
t.Error(err)
|
||||
}
|
||||
rf, err := engine.NewRequestFilter(engine.MetaStatS, "",
|
||||
[]string{"CDRST1:*min_asr:20"})
|
||||
if err != nil {
|
||||
t.Fatal(err)
|
||||
}
|
||||
if passes, err := rf.Pass(cdr, "", statS); err != nil {
|
||||
t.Error(err)
|
||||
} else if !passes {
|
||||
t.Error("Not passing")
|
||||
}
|
||||
}
|
||||
@@ -255,9 +255,9 @@ const (
|
||||
LOG_ERR = "ler_"
|
||||
LOG_CDR = "cdr_"
|
||||
LOG_MEDIATED_CDR = "mcd_"
|
||||
SQStoredMetricsPrefix = "ssm_"
|
||||
StatsConfigPrefix = "scf_"
|
||||
ThresholdCfgPrefix = "thc_"
|
||||
StatQueuePrefix = "stq_"
|
||||
LOADINST_KEY = "load_history"
|
||||
SESSION_MANAGER_SOURCE = "SMR"
|
||||
MEDIATOR_SOURCE = "MED"
|
||||
|
||||
Reference in New Issue
Block a user