diff --git a/cmd/cgr-engine/cgr-engine.go b/cmd/cgr-engine/cgr-engine.go index 66b3e7ae8..b6e2b7871 100644 --- a/cmd/cgr-engine/cgr-engine.go +++ b/cmd/cgr-engine/cgr-engine.go @@ -463,7 +463,7 @@ func main() { } if cfg.CDRStatsEnabled { - cdrStats = &engine.Stats{} + cdrStats = engine.NewStats(accountDb) server.RpcRegister(cdrStats) server.RpcRegister(apier.CDRStatsV1{cdrStats}) // Public APIs } diff --git a/engine/cdrstats.go b/engine/cdrstats.go index 21b0fb2bd..fb4ecc3f6 100644 --- a/engine/cdrstats.go +++ b/engine/cdrstats.go @@ -18,7 +18,12 @@ along with this program. If not, see package engine -import "time" +import ( + "strings" + "time" + + "github.com/cgrates/cgrates/utils" +) type CdrStats struct { Id string // Config id, unique per config instance @@ -43,3 +48,79 @@ type CdrStats struct { CostInterval []float64 // 2 or less items, (>=Cost, 0 { + if cdr.SetupTime.Before(cs.SetupInterval[0]) { + return false + } + if len(cs.SetupInterval) > 1 && (cdr.SetupTime.Equal(cs.SetupInterval[1]) || cdr.SetupTime.After(cs.SetupInterval[1])) { + return false + } + } + if len(cs.TOR) > 0 && !utils.IsSliceMember(cs.TOR, cdr.TOR) { + return false + } + if len(cs.CdrHost) > 0 && !utils.IsSliceMember(cs.CdrHost, cdr.CdrHost) { + return false + } + if len(cs.CdrSource) > 0 && !utils.IsSliceMember(cs.CdrSource, cdr.CdrSource) { + return false + } + if len(cs.ReqType) > 0 && !utils.IsSliceMember(cs.ReqType, cdr.ReqType) { + return false + } + if len(cs.Direction) > 0 && !utils.IsSliceMember(cs.Direction, cdr.Direction) { + return false + } + if len(cs.Tenant) > 0 && !utils.IsSliceMember(cs.Tenant, cdr.Tenant) { + return false + } + if len(cs.Category) > 0 && !utils.IsSliceMember(cs.Category, cdr.Category) { + return false + } + if len(cs.Account) > 0 && !utils.IsSliceMember(cs.Account, cdr.Account) { + return false + } + if len(cs.Subject) > 0 && !utils.IsSliceMember(cs.Subject, cdr.Subject) { + return false + } + if len(cs.DestinationPrefix) > 0 { + found := false + for _, prefix := range cs.DestinationPrefix { + if strings.HasPrefix(cdr.Destination, prefix) { + found = true + break + } + } + if !found { + return false + } + } + if len(cs.UsageInterval) > 0 { + if cdr.Usage < cs.UsageInterval[0] { + return false + } + if len(cs.UsageInterval) > 1 && cdr.Usage >= cs.UsageInterval[1] { + return false + } + } + if len(cs.MediationRunIds) > 0 && !utils.IsSliceMember(cs.MediationRunIds, cdr.MediationRunId) { + return false + } + if len(cs.CostInterval) > 0 { + if cdr.Cost < cs.CostInterval[0] { + return false + } + if len(cs.CostInterval) > 1 && cdr.Cost >= cs.CostInterval[1] { + return false + } + } + if len(cs.RatedAccount) > 0 && !utils.IsSliceMember(cs.RatedAccount, cdr.RatedAccount) { + return false + } + if len(cs.RatedSubject) > 0 && !utils.IsSliceMember(cs.RatedSubject, cdr.RatedSubject) { + return false + } + return true +} diff --git a/engine/stats.go b/engine/stats.go index 71b1f23a8..e1d347580 100644 --- a/engine/stats.go +++ b/engine/stats.go @@ -20,6 +20,7 @@ package engine import ( "errors" + "fmt" "net/rpc" "sync" @@ -27,7 +28,7 @@ import ( ) type StatsInterface interface { - AddQueue(*StatsQueue, *int) error + AddQueue(*CdrStats, *int) error GetValues(string, *map[string]float64) error AppendCDR(*utils.StoredCdr, *int) error } @@ -37,16 +38,30 @@ type Stats struct { mux sync.RWMutex } -func (s *Stats) AddQueue(sq *StatsQueue, out *int) error { +func (s *Stats) AddQueue(cs *CdrStats, out *int) error { s.mux.Lock() defer s.mux.Unlock() if s.queues == nil { s.queues = make(map[string]*StatsQueue) } - s.queues[sq.conf.Id] = sq + if sq, exists := s.queues[cs.Id]; exists { + sq.UpdateConf(cs) + } else { + s.queues[cs.Id] = NewStatsQueue(cs) + } return nil } +func NewStats(accountDb AccountingStorage) *Stats { + cdrStats := &Stats{} + if css, err := accountDb.GetAllCdrStats(); err == nil { + cdrStats.UpdateQueues(css, nil) + } else { + Logger.Err(fmt.Sprintf("Cannot load cdr stats: %v", err)) + } + return cdrStats +} + func (s *Stats) GetValues(sqID string, values *map[string]float64) error { s.mux.RLock() defer s.mux.RUnlock() @@ -100,14 +115,14 @@ func NewProxyStats(addr string) (*ProxyStats, error) { return &ProxyStats{Client: client}, nil } -func (ps *ProxyStats) AddQueue(sq *StatsQueue, out *int) error { - return ps.Client.Call("Scribe.AddQueue", sq, out) +func (ps *ProxyStats) AddQueue(cs *CdrStats, out *int) error { + return ps.Client.Call("Stats.AddQueue", cs, out) } func (ps *ProxyStats) GetValues(sqID string, values *map[string]float64) error { - return ps.Client.Call("Scribe.GetValues", sqID, values) + return ps.Client.Call("Stats.GetValues", sqID, values) } func (ps *ProxyStats) AppendCDR(cdr *utils.StoredCdr, out *int) error { - return ps.Client.Call("Scribe.AppendCDR", cdr, out) + return ps.Client.Call("Stats.AppendCDR", cdr, out) } diff --git a/engine/stats_metrics.go b/engine/stats_metrics.go index e3633a6c2..007543f8e 100644 --- a/engine/stats_metrics.go +++ b/engine/stats_metrics.go @@ -21,8 +21,8 @@ package engine import "time" type Metric interface { - AddCDR(*QCDR) - RemoveCDR(*QCDR) + AddCdr(*QCdr) + RemoveCdr(*QCdr) GetValue() float64 } @@ -49,14 +49,14 @@ type ASRMetric struct { total float64 } -func (asr *ASRMetric) AddCDR(cdr *QCDR) { +func (asr *ASRMetric) AddCdr(cdr *QCdr) { if !cdr.AnswerTime.IsZero() { asr.answered += 1 } asr.total += 1 } -func (asr *ASRMetric) RemoveCDR(cdr *QCDR) { +func (asr *ASRMetric) RemoveCdr(cdr *QCdr) { if !cdr.AnswerTime.IsZero() { asr.answered -= 1 } @@ -74,14 +74,14 @@ type ACDMetric struct { count float64 } -func (acd *ACDMetric) AddCDR(cdr *QCDR) { +func (acd *ACDMetric) AddCdr(cdr *QCdr) { if !cdr.AnswerTime.IsZero() { acd.sum += cdr.Usage acd.count += 1 } } -func (acd *ACDMetric) RemoveCDR(cdr *QCDR) { +func (acd *ACDMetric) RemoveCdr(cdr *QCdr) { if !cdr.AnswerTime.IsZero() { acd.sum -= cdr.Usage acd.count -= 1 @@ -99,14 +99,14 @@ type ACCMetric struct { count float64 } -func (acc *ACCMetric) AddCDR(cdr *QCDR) { +func (acc *ACCMetric) AddCdr(cdr *QCdr) { if !cdr.AnswerTime.IsZero() && cdr.Cost >= 0 { acc.sum += cdr.Cost acc.count += 1 } } -func (acc *ACCMetric) RemoveCDR(cdr *QCDR) { +func (acc *ACCMetric) RemoveCdr(cdr *QCdr) { if !cdr.AnswerTime.IsZero() && cdr.Cost >= 0 { acc.sum -= cdr.Cost acc.count -= 1 diff --git a/engine/stats_queue.go b/engine/stats_queue.go index e3a1939b3..4f900ffa2 100644 --- a/engine/stats_queue.go +++ b/engine/stats_queue.go @@ -27,14 +27,14 @@ import ( ) type StatsQueue struct { - cdrs []*QCDR + cdrs []*QCdr conf *CdrStats metrics map[string]Metric mux sync.RWMutex } // Simplified cdr structure containing only the necessary info -type QCDR struct { +type QCdr struct { SetupTime time.Time AnswerTime time.Time Usage time.Duration @@ -45,27 +45,30 @@ func NewStatsQueue(conf *CdrStats) *StatsQueue { if conf == nil { return &StatsQueue{metrics: make(map[string]Metric)} } - sq := &StatsQueue{ - conf: conf, - metrics: make(map[string]Metric, len(conf.Metrics)), - } + sq := &StatsQueue{} + sq.UpdateConf(conf) + return sq +} + +func (sq *StatsQueue) UpdateConf(conf *CdrStats) { + sq.conf = conf + sq.metrics = make(map[string]Metric, len(conf.Metrics)) for _, m := range conf.Metrics { metric := CreateMetric(m) if metric != nil { sq.metrics[m] = metric } } - return sq } func (sq *StatsQueue) AppendCDR(cdr *utils.StoredCdr) { sq.mux.Lock() defer sq.mux.Unlock() - if sq.acceptCDR(cdr) { - qcdr := sq.simplifyCDR(cdr) + if sq.conf.AcceptCdr(cdr) { + qcdr := sq.simplifyCdr(cdr) sq.cdrs = append(sq.cdrs, qcdr) sq.addToMetrics(qcdr) - sq.purgeObsoleteCDRs() + sq.purgeObsoleteCdrs() // check for trigger stats := sq.getStats() sq.conf.Triggers.Sort() @@ -91,20 +94,20 @@ func (sq *StatsQueue) AppendCDR(cdr *utils.StoredCdr) { } } -func (sq *StatsQueue) addToMetrics(cdr *QCDR) { +func (sq *StatsQueue) addToMetrics(cdr *QCdr) { for _, metric := range sq.metrics { - metric.AddCDR(cdr) + metric.AddCdr(cdr) } } -func (sq *StatsQueue) removeFromMetrics(cdr *QCDR) { +func (sq *StatsQueue) removeFromMetrics(cdr *QCdr) { for _, metric := range sq.metrics { - metric.RemoveCDR(cdr) + metric.RemoveCdr(cdr) } } -func (sq *StatsQueue) simplifyCDR(cdr *utils.StoredCdr) *QCDR { - return &QCDR{ +func (sq *StatsQueue) simplifyCdr(cdr *utils.StoredCdr) *QCdr { + return &QCdr{ SetupTime: cdr.SetupTime, AnswerTime: cdr.AnswerTime, Usage: cdr.Usage, @@ -112,7 +115,7 @@ func (sq *StatsQueue) simplifyCDR(cdr *utils.StoredCdr) *QCDR { } } -func (sq *StatsQueue) purgeObsoleteCDRs() { +func (sq *StatsQueue) purgeObsoleteCdrs() { if sq.conf.QueueLength > 0 { currentLength := len(sq.cdrs) if currentLength > sq.conf.QueueLength { @@ -150,79 +153,3 @@ func (sq *StatsQueue) getStats() map[string]float64 { } return stat } - -func (sq *StatsQueue) acceptCDR(cdr *utils.StoredCdr) bool { - if len(sq.conf.SetupInterval) > 0 { - if cdr.SetupTime.Before(sq.conf.SetupInterval[0]) { - return false - } - if len(sq.conf.SetupInterval) > 1 && (cdr.SetupTime.Equal(sq.conf.SetupInterval[1]) || cdr.SetupTime.After(sq.conf.SetupInterval[1])) { - return false - } - } - if len(sq.conf.TOR) > 0 && !utils.IsSliceMember(sq.conf.TOR, cdr.TOR) { - return false - } - if len(sq.conf.CdrHost) > 0 && !utils.IsSliceMember(sq.conf.CdrHost, cdr.CdrHost) { - return false - } - if len(sq.conf.CdrSource) > 0 && !utils.IsSliceMember(sq.conf.CdrSource, cdr.CdrSource) { - return false - } - if len(sq.conf.ReqType) > 0 && !utils.IsSliceMember(sq.conf.ReqType, cdr.ReqType) { - return false - } - if len(sq.conf.Direction) > 0 && !utils.IsSliceMember(sq.conf.Direction, cdr.Direction) { - return false - } - if len(sq.conf.Tenant) > 0 && !utils.IsSliceMember(sq.conf.Tenant, cdr.Tenant) { - return false - } - if len(sq.conf.Category) > 0 && !utils.IsSliceMember(sq.conf.Category, cdr.Category) { - return false - } - if len(sq.conf.Account) > 0 && !utils.IsSliceMember(sq.conf.Account, cdr.Account) { - return false - } - if len(sq.conf.Subject) > 0 && !utils.IsSliceMember(sq.conf.Subject, cdr.Subject) { - return false - } - if len(sq.conf.DestinationPrefix) > 0 { - found := false - for _, prefix := range sq.conf.DestinationPrefix { - if strings.HasPrefix(cdr.Destination, prefix) { - found = true - break - } - } - if !found { - return false - } - } - if len(sq.conf.UsageInterval) > 0 { - if cdr.Usage < sq.conf.UsageInterval[0] { - return false - } - if len(sq.conf.UsageInterval) > 1 && cdr.Usage >= sq.conf.UsageInterval[1] { - return false - } - } - if len(sq.conf.MediationRunIds) > 0 && !utils.IsSliceMember(sq.conf.MediationRunIds, cdr.MediationRunId) { - return false - } - if len(sq.conf.CostInterval) > 0 { - if cdr.Cost < sq.conf.CostInterval[0] { - return false - } - if len(sq.conf.CostInterval) > 1 && cdr.Cost >= sq.conf.CostInterval[1] { - return false - } - } - if len(sq.conf.RatedAccount) > 0 && !utils.IsSliceMember(sq.conf.RatedAccount, cdr.RatedAccount) { - return false - } - if len(sq.conf.RatedSubject) > 0 && !utils.IsSliceMember(sq.conf.RatedSubject, cdr.RatedSubject) { - return false - } - return true -} diff --git a/engine/stats_test.go b/engine/stats_test.go index 933f43246..360d19698 100644 --- a/engine/stats_test.go +++ b/engine/stats_test.go @@ -71,7 +71,7 @@ func TestStatsSimplifyCDR(t *testing.T) { Cost: 10, } sq := &StatsQueue{} - qcdr := sq.simplifyCDR(cdr) + qcdr := sq.simplifyCdr(cdr) if cdr.SetupTime != qcdr.SetupTime || cdr.AnswerTime != qcdr.AnswerTime || cdr.Usage != qcdr.Usage || @@ -80,7 +80,7 @@ func TestStatsSimplifyCDR(t *testing.T) { } } -func TestAcceptCDR(t *testing.T) { +func TestAcceptCdr(t *testing.T) { sq := NewStatsQueue(nil) cdr := &utils.StoredCdr{ TOR: "tor", @@ -100,83 +100,83 @@ func TestAcceptCDR(t *testing.T) { Cost: 10, } sq.conf = &CdrStats{} - if sq.acceptCDR(cdr) != true { + if sq.conf.AcceptCdr(cdr) != true { t.Error("Should have accepted thif CDR: %+v", cdr) } sq.conf = &CdrStats{TOR: []string{"test"}} - if sq.acceptCDR(cdr) == true { + if sq.conf.AcceptCdr(cdr) == true { t.Error("Should have NOT accepted thif CDR: %+v", cdr) } sq.conf = &CdrStats{CdrHost: []string{"test"}} - if sq.acceptCDR(cdr) == true { + if sq.conf.AcceptCdr(cdr) == true { t.Error("Should have NOT accepted thif CDR: %+v", cdr) } sq.conf = &CdrStats{CdrSource: []string{"test"}} - if sq.acceptCDR(cdr) == true { + if sq.conf.AcceptCdr(cdr) == true { t.Error("Should have NOT accepted thif CDR: %+v", cdr) } sq.conf = &CdrStats{Direction: []string{"test"}} - if sq.acceptCDR(cdr) == true { + if sq.conf.AcceptCdr(cdr) == true { t.Error("Should have NOT accepted thif CDR: %+v", cdr) } sq.conf = &CdrStats{Tenant: []string{"test"}} - if sq.acceptCDR(cdr) == true { + if sq.conf.AcceptCdr(cdr) == true { t.Error("Should have NOT accepted thif CDR: %+v", cdr) } sq.conf = &CdrStats{Category: []string{"test"}} - if sq.acceptCDR(cdr) == true { + if sq.conf.AcceptCdr(cdr) == true { t.Error("Should have NOT accepted thif CDR: %+v", cdr) } sq.conf = &CdrStats{Account: []string{"test"}} - if sq.acceptCDR(cdr) == true { + if sq.conf.AcceptCdr(cdr) == true { t.Error("Should have NOT accepted thif CDR: %+v", cdr) } sq.conf = &CdrStats{Subject: []string{"test"}} - if sq.acceptCDR(cdr) == true { + if sq.conf.AcceptCdr(cdr) == true { t.Error("Should have NOT accepted thif CDR: %+v", cdr) } sq.conf = &CdrStats{RatedAccount: []string{"test"}} - if sq.acceptCDR(cdr) == true { + if sq.conf.AcceptCdr(cdr) == true { t.Error("Should have NOT accepted thif CDR: %+v", cdr) } sq.conf = &CdrStats{RatedSubject: []string{"test"}} - if sq.acceptCDR(cdr) == true { + if sq.conf.AcceptCdr(cdr) == true { t.Error("Should have NOT accepted thif CDR: %+v", cdr) } sq.conf = &CdrStats{DestinationPrefix: []string{"test"}} - if sq.acceptCDR(cdr) == true { + if sq.conf.AcceptCdr(cdr) == true { t.Error("Should have NOT accepted thif CDR: %+v", cdr) } sq.conf = &CdrStats{DestinationPrefix: []string{"test", "123"}} - if sq.acceptCDR(cdr) != true { + if sq.conf.AcceptCdr(cdr) != true { t.Error("Should have accepted thif CDR: %+v", cdr) } sq.conf = &CdrStats{SetupInterval: []time.Time{time.Date(2014, 7, 3, 13, 43, 0, 1, time.UTC)}} - if sq.acceptCDR(cdr) == true { + if sq.conf.AcceptCdr(cdr) == true { t.Error("Should have NOT accepted thif CDR: %+v", cdr) } sq.conf = &CdrStats{SetupInterval: []time.Time{time.Date(2014, 7, 3, 13, 42, 0, 0, time.UTC), time.Date(2014, 7, 3, 13, 43, 0, 0, time.UTC)}} - if sq.acceptCDR(cdr) == true { + if sq.conf.AcceptCdr(cdr) == true { t.Error("Should have NOT accepted thif CDR: %+v", cdr) } sq.conf = &CdrStats{SetupInterval: []time.Time{time.Date(2014, 7, 3, 13, 42, 0, 0, time.UTC)}} - if sq.acceptCDR(cdr) != true { + if sq.conf.AcceptCdr(cdr) != true { t.Error("Should have accepted thif CDR: %+v", cdr) } sq.conf = &CdrStats{SetupInterval: []time.Time{time.Date(2014, 7, 3, 13, 42, 0, 0, time.UTC), time.Date(2014, 7, 3, 13, 43, 0, 1, time.UTC)}} - if sq.acceptCDR(cdr) != true { + if sq.conf.AcceptCdr(cdr) != true { t.Error("Should have accepted thif CDR: %+v", cdr) } sq.conf = &CdrStats{UsageInterval: []time.Duration{11 * time.Second}} - if sq.acceptCDR(cdr) == true { + if sq.conf.AcceptCdr(cdr) == true { t.Error("Should have NOT accepted thif CDR: %+v", cdr) } sq.conf = &CdrStats{UsageInterval: []time.Duration{1 * time.Second, 10 * time.Second}} - if sq.acceptCDR(cdr) == true { + if sq.conf.AcceptCdr(cdr) == true { t.Error("Should have NOT accepted thif CDR: %+v", cdr) } sq.conf = &CdrStats{UsageInterval: []time.Duration{10 * time.Second, 11 * time.Second}} - if sq.acceptCDR(cdr) != true { + if sq.conf.AcceptCdr(cdr) != true { t.Error("Should have accepted thif CDR: %+v", cdr) } }