first draft of cdr stats queues save/load

This commit is contained in:
Radu Ioan Fericean
2015-06-19 22:55:27 +03:00
parent 3d0553ac37
commit ef5413118f
3 changed files with 114 additions and 79 deletions

View File

@@ -60,7 +60,7 @@ func newQueueSaver(id string, saveInterval time.Duration, sq *StatsQueue, adb Ac
select {
case <-c:
if sq.IsDirty() {
if err := accountDb.SetCdrStatsQueue(id, sq); err != nil {
if err := accountDb.SetCdrStatsQueue(sq); err != nil {
Logger.Err(fmt.Sprintf("Error saving cdr stats queue id %s: %v", id, err))
}
}
@@ -147,8 +147,8 @@ func (s *Stats) ResetQueues(ids []string, out *int) error {
if len(ids) == 0 {
for _, sq := range s.queues {
sq.Cdrs = make([]*QCdr, 0)
sq.Metrics = make(map[string]Metric, len(sq.Conf.Metrics))
for _, m := range sq.Conf.Metrics {
sq.Metrics = make(map[string]Metric, len(sq.conf.Metrics))
for _, m := range sq.conf.Metrics {
if metric := CreateMetric(m); metric != nil {
sq.Metrics[m] = metric
}
@@ -162,8 +162,8 @@ func (s *Stats) ResetQueues(ids []string, out *int) error {
continue
}
sq.Cdrs = make([]*QCdr, 0)
sq.Metrics = make(map[string]Metric, len(sq.Conf.Metrics))
for _, m := range sq.Conf.Metrics {
sq.Metrics = make(map[string]Metric, len(sq.conf.Metrics))
for _, m := range sq.conf.Metrics {
if metric := CreateMetric(m); metric != nil {
sq.Metrics[m] = metric
}
@@ -181,9 +181,8 @@ func (s *Stats) UpdateQueues(css []*CdrStats, out *int) error {
defer s.mux.Unlock()
oldQueues := s.queues
s.queues = make(map[string]*StatsQueue, len(css))
s.queueSavers = make(map[string]*queueSaver)
if def, exists := oldQueues[utils.META_DEFAULT]; exists {
def.UpdateConf(def.Conf) // for reset
def.UpdateConf(def.conf) // for reset
s.queues[utils.META_DEFAULT] = def
}
for _, cs := range css {
@@ -196,14 +195,23 @@ func (s *Stats) UpdateQueues(css []*CdrStats, out *int) error {
}
if sq == nil {
sq = NewStatsQueue(cs)
}
si := cs.SaveInterval
if si == 0 {
si = s.defaultSaveInterval
}
if si > 0 {
s.queueSavers[cs.Id] = newQueueSaver(cs.Id, si, sq, s.accountingDb)
// load queue from storage if exists
if saved, err := s.accountingDb.GetCdrStatsQueue(sq.GetId()); err == nil {
sq.Load(saved)
} else {
Logger.Info(err.Error())
}
// setup queue saver
if s.queueSavers == nil {
s.queueSavers = make(map[string]*queueSaver)
}
si := cs.SaveInterval
if si == 0 {
si = s.defaultSaveInterval
}
if si > 0 {
s.queueSavers[cs.Id] = newQueueSaver(cs.Id, si, sq, s.accountingDb)
}
}
s.queues[cs.Id] = sq
}

View File

@@ -19,6 +19,7 @@ along with this program. If not, see <http://www.gnu.org/licenses/>
package engine
import (
"log"
"strings"
"sync"
"time"
@@ -26,7 +27,7 @@ import (
type StatsQueue struct {
Cdrs []*QCdr
Conf *CdrStats
conf *CdrStats
Metrics map[string]Metric
mux sync.Mutex
dirty bool
@@ -68,7 +69,7 @@ func NewStatsQueue(conf *CdrStats) *StatsQueue {
func (sq *StatsQueue) UpdateConf(conf *CdrStats) {
sq.mux.Lock()
defer sq.mux.Unlock()
sq.Conf = conf
sq.conf = conf
sq.Cdrs = make([]*QCdr, 0)
sq.Metrics = make(map[string]Metric, len(conf.Metrics))
sq.dirty = true
@@ -79,10 +80,20 @@ func (sq *StatsQueue) UpdateConf(conf *CdrStats) {
}
}
func (sq *StatsQueue) Load(saved *StatsQueue) {
sq.Cdrs = saved.Cdrs
for key, metric := range saved.Metrics {
if _, exists := sq.Metrics[key]; exists {
sq.Metrics[key] = metric
}
}
}
func (sq *StatsQueue) IsDirty() bool {
sq.mux.Lock()
defer sq.mux.Unlock()
v := sq.dirty
log.Print(v)
// take advantage of the locking to set it to flip it
sq.dirty = false
return v
@@ -91,7 +102,7 @@ func (sq *StatsQueue) IsDirty() bool {
func (sq *StatsQueue) AppendCDR(cdr *StoredCdr) {
sq.mux.Lock()
defer sq.mux.Unlock()
if sq.Conf.AcceptCdr(cdr) {
if sq.conf.AcceptCdr(cdr) {
qcdr := sq.simplifyCdr(cdr)
sq.Cdrs = append(sq.Cdrs, qcdr)
sq.addToMetrics(qcdr)
@@ -99,8 +110,8 @@ func (sq *StatsQueue) AppendCDR(cdr *StoredCdr) {
sq.dirty = true
// check for trigger
stats := sq.getStats()
sq.Conf.Triggers.Sort()
for _, at := range sq.Conf.Triggers {
sq.conf.Triggers.Sort()
for _, at := range sq.conf.Triggers {
if at.MinQueuedItems > 0 && len(sq.Cdrs) < at.MinQueuedItems {
continue
}
@@ -123,14 +134,12 @@ func (sq *StatsQueue) AppendCDR(cdr *StoredCdr) {
}
func (sq *StatsQueue) addToMetrics(cdr *QCdr) {
sq.dirty = true
for _, metric := range sq.Metrics {
metric.AddCdr(cdr)
}
}
func (sq *StatsQueue) removeFromMetrics(cdr *QCdr) {
sq.dirty = true
for _, metric := range sq.Metrics {
metric.RemoveCdr(cdr)
}
@@ -147,18 +156,18 @@ func (sq *StatsQueue) simplifyCdr(cdr *StoredCdr) *QCdr {
}
func (sq *StatsQueue) purgeObsoleteCdrs() {
if sq.Conf.QueueLength > 0 {
if sq.conf.QueueLength > 0 {
currentLength := len(sq.Cdrs)
if currentLength > sq.Conf.QueueLength {
for _, cdr := range sq.Cdrs[:currentLength-sq.Conf.QueueLength] {
if currentLength > sq.conf.QueueLength {
for _, cdr := range sq.Cdrs[:currentLength-sq.conf.QueueLength] {
sq.removeFromMetrics(cdr)
}
sq.Cdrs = sq.Cdrs[currentLength-sq.Conf.QueueLength:]
sq.Cdrs = sq.Cdrs[currentLength-sq.conf.QueueLength:]
}
}
if sq.Conf.TimeWindow > 0 {
if sq.conf.TimeWindow > 0 {
for i, cdr := range sq.Cdrs {
if time.Now().Sub(cdr.SetupTime) > sq.Conf.TimeWindow {
if time.Now().Sub(cdr.SetupTime) > sq.conf.TimeWindow {
sq.removeFromMetrics(cdr)
continue
} else {
@@ -187,12 +196,12 @@ func (sq *StatsQueue) getStats() map[string]float64 {
}
func (sq *StatsQueue) GetId() string {
return sq.Conf.Id
return sq.conf.Id
}
// Convert data into a struct which can be used in actions based on triggers hit
func (sq *StatsQueue) Triggered(at *ActionTrigger) *StatsQueueTriggered {
return &StatsQueueTriggered{Id: sq.Conf.Id, Metrics: sq.getStats(), Trigger: at}
return &StatsQueueTriggered{Id: sq.conf.Id, Metrics: sq.getStats(), Trigger: at}
}
// Struct to be passed to triggered actions

View File

@@ -104,100 +104,100 @@ func TestAcceptCdr(t *testing.T) {
MediationRunId: "mri",
Cost: 10,
}
sq.Conf = &CdrStats{}
if sq.Conf.AcceptCdr(cdr) != true {
sq.conf = &CdrStats{}
if sq.conf.AcceptCdr(cdr) != true {
t.Errorf("Should have accepted this CDR: %+v", cdr)
}
sq.Conf = &CdrStats{TOR: []string{"test"}}
if sq.Conf.AcceptCdr(cdr) == true {
sq.conf = &CdrStats{TOR: []string{"test"}}
if sq.conf.AcceptCdr(cdr) == true {
t.Errorf("Should have NOT accepted this CDR: %+v", cdr)
}
sq.Conf = &CdrStats{CdrHost: []string{"test"}}
if sq.Conf.AcceptCdr(cdr) == true {
sq.conf = &CdrStats{CdrHost: []string{"test"}}
if sq.conf.AcceptCdr(cdr) == true {
t.Errorf("Should have NOT accepted this CDR: %+v", cdr)
}
sq.Conf = &CdrStats{CdrSource: []string{"test"}}
if sq.Conf.AcceptCdr(cdr) == true {
sq.conf = &CdrStats{CdrSource: []string{"test"}}
if sq.conf.AcceptCdr(cdr) == true {
t.Errorf("Should have NOT accepted this CDR: %+v", cdr)
}
sq.Conf = &CdrStats{Direction: []string{"test"}}
if sq.Conf.AcceptCdr(cdr) == true {
sq.conf = &CdrStats{Direction: []string{"test"}}
if sq.conf.AcceptCdr(cdr) == true {
t.Errorf("Should have NOT accepted this CDR: %+v", cdr)
}
sq.Conf = &CdrStats{Tenant: []string{"test"}}
if sq.Conf.AcceptCdr(cdr) == true {
sq.conf = &CdrStats{Tenant: []string{"test"}}
if sq.conf.AcceptCdr(cdr) == true {
t.Errorf("Should have NOT accepted this CDR: %+v", cdr)
}
sq.Conf = &CdrStats{Category: []string{"test"}}
if sq.Conf.AcceptCdr(cdr) == true {
sq.conf = &CdrStats{Category: []string{"test"}}
if sq.conf.AcceptCdr(cdr) == true {
t.Errorf("Should have NOT accepted this CDR: %+v", cdr)
}
sq.Conf = &CdrStats{Account: []string{"test"}}
if sq.Conf.AcceptCdr(cdr) == true {
sq.conf = &CdrStats{Account: []string{"test"}}
if sq.conf.AcceptCdr(cdr) == true {
t.Errorf("Should have NOT accepted this CDR: %+v", cdr)
}
sq.Conf = &CdrStats{Subject: []string{"test"}}
if sq.Conf.AcceptCdr(cdr) == true {
sq.conf = &CdrStats{Subject: []string{"test"}}
if sq.conf.AcceptCdr(cdr) == true {
t.Errorf("Should have NOT accepted this CDR: %+v", cdr)
}
sq.Conf = &CdrStats{Supplier: []string{"test"}}
if sq.Conf.AcceptCdr(cdr) == true {
sq.conf = &CdrStats{Supplier: []string{"test"}}
if sq.conf.AcceptCdr(cdr) == true {
t.Errorf("Should have NOT accepted this CDR: %+v", cdr)
}
sq.Conf = &CdrStats{DisconnectCause: []string{"test"}}
if sq.Conf.AcceptCdr(cdr) == true {
sq.conf = &CdrStats{DisconnectCause: []string{"test"}}
if sq.conf.AcceptCdr(cdr) == true {
t.Errorf("Should have NOT accepted this CDR: %+v", cdr)
}
sq.Conf = &CdrStats{RatedAccount: []string{"test"}}
if sq.Conf.AcceptCdr(cdr) == true {
sq.conf = &CdrStats{RatedAccount: []string{"test"}}
if sq.conf.AcceptCdr(cdr) == true {
t.Errorf("Should have NOT accepted this CDR: %+v", cdr)
}
sq.Conf = &CdrStats{RatedSubject: []string{"test"}}
if sq.Conf.AcceptCdr(cdr) == true {
sq.conf = &CdrStats{RatedSubject: []string{"test"}}
if sq.conf.AcceptCdr(cdr) == true {
t.Errorf("Should have NOT accepted this CDR: %+v", cdr)
}
sq.Conf = &CdrStats{DestinationPrefix: []string{"test"}}
if sq.Conf.AcceptCdr(cdr) == true {
sq.conf = &CdrStats{DestinationPrefix: []string{"test"}}
if sq.conf.AcceptCdr(cdr) == true {
t.Errorf("Should have NOT accepted this CDR: %+v", cdr)
}
sq.Conf = &CdrStats{DestinationPrefix: []string{"test", "123"}}
if sq.Conf.AcceptCdr(cdr) != true {
sq.conf = &CdrStats{DestinationPrefix: []string{"test", "123"}}
if sq.conf.AcceptCdr(cdr) != true {
t.Errorf("Should have accepted this CDR: %+v", cdr)
}
sq.Conf = &CdrStats{SetupInterval: []time.Time{time.Date(2014, 7, 3, 13, 43, 0, 1, time.UTC)}}
if sq.Conf.AcceptCdr(cdr) == true {
sq.conf = &CdrStats{SetupInterval: []time.Time{time.Date(2014, 7, 3, 13, 43, 0, 1, time.UTC)}}
if sq.conf.AcceptCdr(cdr) == true {
t.Errorf("Should have NOT accepted this CDR: %+v", cdr)
}
sq.Conf = &CdrStats{SetupInterval: []time.Time{time.Date(2014, 7, 3, 13, 42, 0, 0, time.UTC), time.Date(2014, 7, 3, 13, 43, 0, 0, time.UTC)}}
if sq.Conf.AcceptCdr(cdr) == true {
sq.conf = &CdrStats{SetupInterval: []time.Time{time.Date(2014, 7, 3, 13, 42, 0, 0, time.UTC), time.Date(2014, 7, 3, 13, 43, 0, 0, time.UTC)}}
if sq.conf.AcceptCdr(cdr) == true {
t.Errorf("Should have NOT accepted this CDR: %+v", cdr)
}
sq.Conf = &CdrStats{SetupInterval: []time.Time{time.Date(2014, 7, 3, 13, 42, 0, 0, time.UTC)}}
if sq.Conf.AcceptCdr(cdr) != true {
sq.conf = &CdrStats{SetupInterval: []time.Time{time.Date(2014, 7, 3, 13, 42, 0, 0, time.UTC)}}
if sq.conf.AcceptCdr(cdr) != true {
t.Errorf("Should have accepted this CDR: %+v", cdr)
}
sq.Conf = &CdrStats{SetupInterval: []time.Time{time.Date(2014, 7, 3, 13, 42, 0, 0, time.UTC), time.Date(2014, 7, 3, 13, 43, 0, 1, time.UTC)}}
if sq.Conf.AcceptCdr(cdr) != true {
sq.conf = &CdrStats{SetupInterval: []time.Time{time.Date(2014, 7, 3, 13, 42, 0, 0, time.UTC), time.Date(2014, 7, 3, 13, 43, 0, 1, time.UTC)}}
if sq.conf.AcceptCdr(cdr) != true {
t.Errorf("Should have accepted this CDR: %+v", cdr)
}
sq.Conf = &CdrStats{UsageInterval: []time.Duration{11 * time.Second}}
if sq.Conf.AcceptCdr(cdr) == true {
sq.conf = &CdrStats{UsageInterval: []time.Duration{11 * time.Second}}
if sq.conf.AcceptCdr(cdr) == true {
t.Errorf("Should have NOT accepted this CDR: %+v", cdr)
}
sq.Conf = &CdrStats{UsageInterval: []time.Duration{1 * time.Second, 10 * time.Second}}
if sq.Conf.AcceptCdr(cdr) == true {
sq.conf = &CdrStats{UsageInterval: []time.Duration{1 * time.Second, 10 * time.Second}}
if sq.conf.AcceptCdr(cdr) == true {
t.Errorf("Should have NOT accepted this CDR: %+v", cdr)
}
sq.Conf = &CdrStats{PddInterval: []time.Duration{8 * time.Second}}
if sq.Conf.AcceptCdr(cdr) == true {
sq.conf = &CdrStats{PddInterval: []time.Duration{8 * time.Second}}
if sq.conf.AcceptCdr(cdr) == true {
t.Errorf("Should have NOT accepted this CDR: %+v", cdr)
}
sq.Conf = &CdrStats{PddInterval: []time.Duration{3 * time.Second, 7 * time.Second}}
if sq.Conf.AcceptCdr(cdr) == true {
sq.conf = &CdrStats{PddInterval: []time.Duration{3 * time.Second, 7 * time.Second}}
if sq.conf.AcceptCdr(cdr) == true {
t.Errorf("Should have NOT accepted this CDR: %+v", cdr)
}
sq.Conf = &CdrStats{PddInterval: []time.Duration{3 * time.Second, 8 * time.Second}}
if sq.Conf.AcceptCdr(cdr) != true {
sq.conf = &CdrStats{PddInterval: []time.Duration{3 * time.Second, 8 * time.Second}}
if sq.conf.AcceptCdr(cdr) != true {
t.Errorf("Should have accepted this CDR: %+v", cdr)
}
}
@@ -287,7 +287,7 @@ func TestStatsReloadQueues(t *testing.T) {
result := len(ids)
expected := 2
if result != expected {
t.Errorf("Error loading stats queues. Expected %v was %v", expected, result)
t.Errorf("Error loading stats queues. Expected %v was %v: %v", expected, result, ids)
}
valMap := make(map[string]float64)
if err := cdrStats.GetValues("CDRST2", &valMap); err != nil {
@@ -366,6 +366,24 @@ func TestStatsReloadQueuesWithIds(t *testing.T) {
}
}
func TestStatsSaveQueues(t *testing.T) {
cdrStats := NewStats(ratingStorage, accountingStorage, 0)
cdr := &StoredCdr{
Tenant: "cgrates.org",
Category: "call",
AnswerTime: time.Now(),
SetupTime: time.Now(),
Usage: 10 * time.Second,
Cost: 10,
}
cdrStats.AppendCDR(cdr, nil)
ids := []string{}
cdrStats.GetQueueIds(0, &ids)
if _, found := cdrStats.queueSavers["CDRST1"]; !found {
t.Error("Error creating queue savers: ", cdrStats.queueSavers)
}
}
func TestStatsResetQueues(t *testing.T) {
cdrStats := NewStats(ratingStorage, accountingStorage, 0)
cdr := &StoredCdr{