This commit is contained in:
DanB
2014-07-31 15:28:45 +02:00
6 changed files with 155 additions and 132 deletions

View File

@@ -463,7 +463,7 @@ func main() {
}
if cfg.CDRStatsEnabled {
cdrStats = &engine.Stats{}
cdrStats = engine.NewStats(accountDb)
server.RpcRegister(cdrStats)
server.RpcRegister(apier.CDRStatsV1{cdrStats}) // Public APIs
}

View File

@@ -18,7 +18,12 @@ along with this program. If not, see <http://www.gnu.org/licenses/>
package engine
import "time"
import (
"strings"
"time"
"github.com/cgrates/cgrates/utils"
)
type CdrStats struct {
Id string // Config id, unique per config instance
@@ -43,3 +48,79 @@ type CdrStats struct {
CostInterval []float64 // 2 or less items, (>=Cost, <Cost)
Triggers ActionTriggerPriotityList
}
func (cs *CdrStats) AcceptCdr(cdr *utils.StoredCdr) bool {
if len(cs.SetupInterval) > 0 {
if cdr.SetupTime.Before(cs.SetupInterval[0]) {
return false
}
if len(cs.SetupInterval) > 1 && (cdr.SetupTime.Equal(cs.SetupInterval[1]) || cdr.SetupTime.After(cs.SetupInterval[1])) {
return false
}
}
if len(cs.TOR) > 0 && !utils.IsSliceMember(cs.TOR, cdr.TOR) {
return false
}
if len(cs.CdrHost) > 0 && !utils.IsSliceMember(cs.CdrHost, cdr.CdrHost) {
return false
}
if len(cs.CdrSource) > 0 && !utils.IsSliceMember(cs.CdrSource, cdr.CdrSource) {
return false
}
if len(cs.ReqType) > 0 && !utils.IsSliceMember(cs.ReqType, cdr.ReqType) {
return false
}
if len(cs.Direction) > 0 && !utils.IsSliceMember(cs.Direction, cdr.Direction) {
return false
}
if len(cs.Tenant) > 0 && !utils.IsSliceMember(cs.Tenant, cdr.Tenant) {
return false
}
if len(cs.Category) > 0 && !utils.IsSliceMember(cs.Category, cdr.Category) {
return false
}
if len(cs.Account) > 0 && !utils.IsSliceMember(cs.Account, cdr.Account) {
return false
}
if len(cs.Subject) > 0 && !utils.IsSliceMember(cs.Subject, cdr.Subject) {
return false
}
if len(cs.DestinationPrefix) > 0 {
found := false
for _, prefix := range cs.DestinationPrefix {
if strings.HasPrefix(cdr.Destination, prefix) {
found = true
break
}
}
if !found {
return false
}
}
if len(cs.UsageInterval) > 0 {
if cdr.Usage < cs.UsageInterval[0] {
return false
}
if len(cs.UsageInterval) > 1 && cdr.Usage >= cs.UsageInterval[1] {
return false
}
}
if len(cs.MediationRunIds) > 0 && !utils.IsSliceMember(cs.MediationRunIds, cdr.MediationRunId) {
return false
}
if len(cs.CostInterval) > 0 {
if cdr.Cost < cs.CostInterval[0] {
return false
}
if len(cs.CostInterval) > 1 && cdr.Cost >= cs.CostInterval[1] {
return false
}
}
if len(cs.RatedAccount) > 0 && !utils.IsSliceMember(cs.RatedAccount, cdr.RatedAccount) {
return false
}
if len(cs.RatedSubject) > 0 && !utils.IsSliceMember(cs.RatedSubject, cdr.RatedSubject) {
return false
}
return true
}

View File

@@ -20,6 +20,7 @@ package engine
import (
"errors"
"fmt"
"net/rpc"
"sync"
@@ -27,7 +28,7 @@ import (
)
type StatsInterface interface {
AddQueue(*StatsQueue, *int) error
AddQueue(*CdrStats, *int) error
GetValues(string, *map[string]float64) error
AppendCDR(*utils.StoredCdr, *int) error
}
@@ -37,16 +38,30 @@ type Stats struct {
mux sync.RWMutex
}
func (s *Stats) AddQueue(sq *StatsQueue, out *int) error {
func (s *Stats) AddQueue(cs *CdrStats, out *int) error {
s.mux.Lock()
defer s.mux.Unlock()
if s.queues == nil {
s.queues = make(map[string]*StatsQueue)
}
s.queues[sq.conf.Id] = sq
if sq, exists := s.queues[cs.Id]; exists {
sq.UpdateConf(cs)
} else {
s.queues[cs.Id] = NewStatsQueue(cs)
}
return nil
}
func NewStats(accountDb AccountingStorage) *Stats {
cdrStats := &Stats{}
if css, err := accountDb.GetAllCdrStats(); err == nil {
cdrStats.UpdateQueues(css, nil)
} else {
Logger.Err(fmt.Sprintf("Cannot load cdr stats: %v", err))
}
return cdrStats
}
func (s *Stats) GetValues(sqID string, values *map[string]float64) error {
s.mux.RLock()
defer s.mux.RUnlock()
@@ -100,14 +115,14 @@ func NewProxyStats(addr string) (*ProxyStats, error) {
return &ProxyStats{Client: client}, nil
}
func (ps *ProxyStats) AddQueue(sq *StatsQueue, out *int) error {
return ps.Client.Call("Scribe.AddQueue", sq, out)
func (ps *ProxyStats) AddQueue(cs *CdrStats, out *int) error {
return ps.Client.Call("Stats.AddQueue", cs, out)
}
func (ps *ProxyStats) GetValues(sqID string, values *map[string]float64) error {
return ps.Client.Call("Scribe.GetValues", sqID, values)
return ps.Client.Call("Stats.GetValues", sqID, values)
}
func (ps *ProxyStats) AppendCDR(cdr *utils.StoredCdr, out *int) error {
return ps.Client.Call("Scribe.AppendCDR", cdr, out)
return ps.Client.Call("Stats.AppendCDR", cdr, out)
}

View File

@@ -21,8 +21,8 @@ package engine
import "time"
type Metric interface {
AddCDR(*QCDR)
RemoveCDR(*QCDR)
AddCdr(*QCdr)
RemoveCdr(*QCdr)
GetValue() float64
}
@@ -49,14 +49,14 @@ type ASRMetric struct {
total float64
}
func (asr *ASRMetric) AddCDR(cdr *QCDR) {
func (asr *ASRMetric) AddCdr(cdr *QCdr) {
if !cdr.AnswerTime.IsZero() {
asr.answered += 1
}
asr.total += 1
}
func (asr *ASRMetric) RemoveCDR(cdr *QCDR) {
func (asr *ASRMetric) RemoveCdr(cdr *QCdr) {
if !cdr.AnswerTime.IsZero() {
asr.answered -= 1
}
@@ -74,14 +74,14 @@ type ACDMetric struct {
count float64
}
func (acd *ACDMetric) AddCDR(cdr *QCDR) {
func (acd *ACDMetric) AddCdr(cdr *QCdr) {
if !cdr.AnswerTime.IsZero() {
acd.sum += cdr.Usage
acd.count += 1
}
}
func (acd *ACDMetric) RemoveCDR(cdr *QCDR) {
func (acd *ACDMetric) RemoveCdr(cdr *QCdr) {
if !cdr.AnswerTime.IsZero() {
acd.sum -= cdr.Usage
acd.count -= 1
@@ -99,14 +99,14 @@ type ACCMetric struct {
count float64
}
func (acc *ACCMetric) AddCDR(cdr *QCDR) {
func (acc *ACCMetric) AddCdr(cdr *QCdr) {
if !cdr.AnswerTime.IsZero() && cdr.Cost >= 0 {
acc.sum += cdr.Cost
acc.count += 1
}
}
func (acc *ACCMetric) RemoveCDR(cdr *QCDR) {
func (acc *ACCMetric) RemoveCdr(cdr *QCdr) {
if !cdr.AnswerTime.IsZero() && cdr.Cost >= 0 {
acc.sum -= cdr.Cost
acc.count -= 1

View File

@@ -27,14 +27,14 @@ import (
)
type StatsQueue struct {
cdrs []*QCDR
cdrs []*QCdr
conf *CdrStats
metrics map[string]Metric
mux sync.RWMutex
}
// Simplified cdr structure containing only the necessary info
type QCDR struct {
type QCdr struct {
SetupTime time.Time
AnswerTime time.Time
Usage time.Duration
@@ -45,27 +45,30 @@ func NewStatsQueue(conf *CdrStats) *StatsQueue {
if conf == nil {
return &StatsQueue{metrics: make(map[string]Metric)}
}
sq := &StatsQueue{
conf: conf,
metrics: make(map[string]Metric, len(conf.Metrics)),
}
sq := &StatsQueue{}
sq.UpdateConf(conf)
return sq
}
func (sq *StatsQueue) UpdateConf(conf *CdrStats) {
sq.conf = conf
sq.metrics = make(map[string]Metric, len(conf.Metrics))
for _, m := range conf.Metrics {
metric := CreateMetric(m)
if metric != nil {
sq.metrics[m] = metric
}
}
return sq
}
func (sq *StatsQueue) AppendCDR(cdr *utils.StoredCdr) {
sq.mux.Lock()
defer sq.mux.Unlock()
if sq.acceptCDR(cdr) {
qcdr := sq.simplifyCDR(cdr)
if sq.conf.AcceptCdr(cdr) {
qcdr := sq.simplifyCdr(cdr)
sq.cdrs = append(sq.cdrs, qcdr)
sq.addToMetrics(qcdr)
sq.purgeObsoleteCDRs()
sq.purgeObsoleteCdrs()
// check for trigger
stats := sq.getStats()
sq.conf.Triggers.Sort()
@@ -91,20 +94,20 @@ func (sq *StatsQueue) AppendCDR(cdr *utils.StoredCdr) {
}
}
func (sq *StatsQueue) addToMetrics(cdr *QCDR) {
func (sq *StatsQueue) addToMetrics(cdr *QCdr) {
for _, metric := range sq.metrics {
metric.AddCDR(cdr)
metric.AddCdr(cdr)
}
}
func (sq *StatsQueue) removeFromMetrics(cdr *QCDR) {
func (sq *StatsQueue) removeFromMetrics(cdr *QCdr) {
for _, metric := range sq.metrics {
metric.RemoveCDR(cdr)
metric.RemoveCdr(cdr)
}
}
func (sq *StatsQueue) simplifyCDR(cdr *utils.StoredCdr) *QCDR {
return &QCDR{
func (sq *StatsQueue) simplifyCdr(cdr *utils.StoredCdr) *QCdr {
return &QCdr{
SetupTime: cdr.SetupTime,
AnswerTime: cdr.AnswerTime,
Usage: cdr.Usage,
@@ -112,7 +115,7 @@ func (sq *StatsQueue) simplifyCDR(cdr *utils.StoredCdr) *QCDR {
}
}
func (sq *StatsQueue) purgeObsoleteCDRs() {
func (sq *StatsQueue) purgeObsoleteCdrs() {
if sq.conf.QueueLength > 0 {
currentLength := len(sq.cdrs)
if currentLength > sq.conf.QueueLength {
@@ -150,79 +153,3 @@ func (sq *StatsQueue) getStats() map[string]float64 {
}
return stat
}
func (sq *StatsQueue) acceptCDR(cdr *utils.StoredCdr) bool {
if len(sq.conf.SetupInterval) > 0 {
if cdr.SetupTime.Before(sq.conf.SetupInterval[0]) {
return false
}
if len(sq.conf.SetupInterval) > 1 && (cdr.SetupTime.Equal(sq.conf.SetupInterval[1]) || cdr.SetupTime.After(sq.conf.SetupInterval[1])) {
return false
}
}
if len(sq.conf.TOR) > 0 && !utils.IsSliceMember(sq.conf.TOR, cdr.TOR) {
return false
}
if len(sq.conf.CdrHost) > 0 && !utils.IsSliceMember(sq.conf.CdrHost, cdr.CdrHost) {
return false
}
if len(sq.conf.CdrSource) > 0 && !utils.IsSliceMember(sq.conf.CdrSource, cdr.CdrSource) {
return false
}
if len(sq.conf.ReqType) > 0 && !utils.IsSliceMember(sq.conf.ReqType, cdr.ReqType) {
return false
}
if len(sq.conf.Direction) > 0 && !utils.IsSliceMember(sq.conf.Direction, cdr.Direction) {
return false
}
if len(sq.conf.Tenant) > 0 && !utils.IsSliceMember(sq.conf.Tenant, cdr.Tenant) {
return false
}
if len(sq.conf.Category) > 0 && !utils.IsSliceMember(sq.conf.Category, cdr.Category) {
return false
}
if len(sq.conf.Account) > 0 && !utils.IsSliceMember(sq.conf.Account, cdr.Account) {
return false
}
if len(sq.conf.Subject) > 0 && !utils.IsSliceMember(sq.conf.Subject, cdr.Subject) {
return false
}
if len(sq.conf.DestinationPrefix) > 0 {
found := false
for _, prefix := range sq.conf.DestinationPrefix {
if strings.HasPrefix(cdr.Destination, prefix) {
found = true
break
}
}
if !found {
return false
}
}
if len(sq.conf.UsageInterval) > 0 {
if cdr.Usage < sq.conf.UsageInterval[0] {
return false
}
if len(sq.conf.UsageInterval) > 1 && cdr.Usage >= sq.conf.UsageInterval[1] {
return false
}
}
if len(sq.conf.MediationRunIds) > 0 && !utils.IsSliceMember(sq.conf.MediationRunIds, cdr.MediationRunId) {
return false
}
if len(sq.conf.CostInterval) > 0 {
if cdr.Cost < sq.conf.CostInterval[0] {
return false
}
if len(sq.conf.CostInterval) > 1 && cdr.Cost >= sq.conf.CostInterval[1] {
return false
}
}
if len(sq.conf.RatedAccount) > 0 && !utils.IsSliceMember(sq.conf.RatedAccount, cdr.RatedAccount) {
return false
}
if len(sq.conf.RatedSubject) > 0 && !utils.IsSliceMember(sq.conf.RatedSubject, cdr.RatedSubject) {
return false
}
return true
}

View File

@@ -71,7 +71,7 @@ func TestStatsSimplifyCDR(t *testing.T) {
Cost: 10,
}
sq := &StatsQueue{}
qcdr := sq.simplifyCDR(cdr)
qcdr := sq.simplifyCdr(cdr)
if cdr.SetupTime != qcdr.SetupTime ||
cdr.AnswerTime != qcdr.AnswerTime ||
cdr.Usage != qcdr.Usage ||
@@ -80,7 +80,7 @@ func TestStatsSimplifyCDR(t *testing.T) {
}
}
func TestAcceptCDR(t *testing.T) {
func TestAcceptCdr(t *testing.T) {
sq := NewStatsQueue(nil)
cdr := &utils.StoredCdr{
TOR: "tor",
@@ -100,83 +100,83 @@ func TestAcceptCDR(t *testing.T) {
Cost: 10,
}
sq.conf = &CdrStats{}
if sq.acceptCDR(cdr) != true {
if sq.conf.AcceptCdr(cdr) != true {
t.Error("Should have accepted thif CDR: %+v", cdr)
}
sq.conf = &CdrStats{TOR: []string{"test"}}
if sq.acceptCDR(cdr) == true {
if sq.conf.AcceptCdr(cdr) == true {
t.Error("Should have NOT accepted thif CDR: %+v", cdr)
}
sq.conf = &CdrStats{CdrHost: []string{"test"}}
if sq.acceptCDR(cdr) == true {
if sq.conf.AcceptCdr(cdr) == true {
t.Error("Should have NOT accepted thif CDR: %+v", cdr)
}
sq.conf = &CdrStats{CdrSource: []string{"test"}}
if sq.acceptCDR(cdr) == true {
if sq.conf.AcceptCdr(cdr) == true {
t.Error("Should have NOT accepted thif CDR: %+v", cdr)
}
sq.conf = &CdrStats{Direction: []string{"test"}}
if sq.acceptCDR(cdr) == true {
if sq.conf.AcceptCdr(cdr) == true {
t.Error("Should have NOT accepted thif CDR: %+v", cdr)
}
sq.conf = &CdrStats{Tenant: []string{"test"}}
if sq.acceptCDR(cdr) == true {
if sq.conf.AcceptCdr(cdr) == true {
t.Error("Should have NOT accepted thif CDR: %+v", cdr)
}
sq.conf = &CdrStats{Category: []string{"test"}}
if sq.acceptCDR(cdr) == true {
if sq.conf.AcceptCdr(cdr) == true {
t.Error("Should have NOT accepted thif CDR: %+v", cdr)
}
sq.conf = &CdrStats{Account: []string{"test"}}
if sq.acceptCDR(cdr) == true {
if sq.conf.AcceptCdr(cdr) == true {
t.Error("Should have NOT accepted thif CDR: %+v", cdr)
}
sq.conf = &CdrStats{Subject: []string{"test"}}
if sq.acceptCDR(cdr) == true {
if sq.conf.AcceptCdr(cdr) == true {
t.Error("Should have NOT accepted thif CDR: %+v", cdr)
}
sq.conf = &CdrStats{RatedAccount: []string{"test"}}
if sq.acceptCDR(cdr) == true {
if sq.conf.AcceptCdr(cdr) == true {
t.Error("Should have NOT accepted thif CDR: %+v", cdr)
}
sq.conf = &CdrStats{RatedSubject: []string{"test"}}
if sq.acceptCDR(cdr) == true {
if sq.conf.AcceptCdr(cdr) == true {
t.Error("Should have NOT accepted thif CDR: %+v", cdr)
}
sq.conf = &CdrStats{DestinationPrefix: []string{"test"}}
if sq.acceptCDR(cdr) == true {
if sq.conf.AcceptCdr(cdr) == true {
t.Error("Should have NOT accepted thif CDR: %+v", cdr)
}
sq.conf = &CdrStats{DestinationPrefix: []string{"test", "123"}}
if sq.acceptCDR(cdr) != true {
if sq.conf.AcceptCdr(cdr) != true {
t.Error("Should have accepted thif CDR: %+v", cdr)
}
sq.conf = &CdrStats{SetupInterval: []time.Time{time.Date(2014, 7, 3, 13, 43, 0, 1, time.UTC)}}
if sq.acceptCDR(cdr) == true {
if sq.conf.AcceptCdr(cdr) == true {
t.Error("Should have NOT accepted thif CDR: %+v", cdr)
}
sq.conf = &CdrStats{SetupInterval: []time.Time{time.Date(2014, 7, 3, 13, 42, 0, 0, time.UTC), time.Date(2014, 7, 3, 13, 43, 0, 0, time.UTC)}}
if sq.acceptCDR(cdr) == true {
if sq.conf.AcceptCdr(cdr) == true {
t.Error("Should have NOT accepted thif CDR: %+v", cdr)
}
sq.conf = &CdrStats{SetupInterval: []time.Time{time.Date(2014, 7, 3, 13, 42, 0, 0, time.UTC)}}
if sq.acceptCDR(cdr) != true {
if sq.conf.AcceptCdr(cdr) != true {
t.Error("Should have accepted thif CDR: %+v", cdr)
}
sq.conf = &CdrStats{SetupInterval: []time.Time{time.Date(2014, 7, 3, 13, 42, 0, 0, time.UTC), time.Date(2014, 7, 3, 13, 43, 0, 1, time.UTC)}}
if sq.acceptCDR(cdr) != true {
if sq.conf.AcceptCdr(cdr) != true {
t.Error("Should have accepted thif CDR: %+v", cdr)
}
sq.conf = &CdrStats{UsageInterval: []time.Duration{11 * time.Second}}
if sq.acceptCDR(cdr) == true {
if sq.conf.AcceptCdr(cdr) == true {
t.Error("Should have NOT accepted thif CDR: %+v", cdr)
}
sq.conf = &CdrStats{UsageInterval: []time.Duration{1 * time.Second, 10 * time.Second}}
if sq.acceptCDR(cdr) == true {
if sq.conf.AcceptCdr(cdr) == true {
t.Error("Should have NOT accepted thif CDR: %+v", cdr)
}
sq.conf = &CdrStats{UsageInterval: []time.Duration{10 * time.Second, 11 * time.Second}}
if sq.acceptCDR(cdr) != true {
if sq.conf.AcceptCdr(cdr) != true {
t.Error("Should have accepted thif CDR: %+v", cdr)
}
}