diff --git a/engine/stats.go b/engine/stats.go index 3192efe04..1c83b775d 100644 --- a/engine/stats.go +++ b/engine/stats.go @@ -60,7 +60,7 @@ func newQueueSaver(id string, saveInterval time.Duration, sq *StatsQueue, adb Ac select { case <-c: if sq.IsDirty() { - if err := accountDb.SetCdrStatsQueue(id, sq); err != nil { + if err := accountDb.SetCdrStatsQueue(sq); err != nil { Logger.Err(fmt.Sprintf("Error saving cdr stats queue id %s: %v", id, err)) } } @@ -147,8 +147,8 @@ func (s *Stats) ResetQueues(ids []string, out *int) error { if len(ids) == 0 { for _, sq := range s.queues { sq.Cdrs = make([]*QCdr, 0) - sq.Metrics = make(map[string]Metric, len(sq.Conf.Metrics)) - for _, m := range sq.Conf.Metrics { + sq.Metrics = make(map[string]Metric, len(sq.conf.Metrics)) + for _, m := range sq.conf.Metrics { if metric := CreateMetric(m); metric != nil { sq.Metrics[m] = metric } @@ -162,8 +162,8 @@ func (s *Stats) ResetQueues(ids []string, out *int) error { continue } sq.Cdrs = make([]*QCdr, 0) - sq.Metrics = make(map[string]Metric, len(sq.Conf.Metrics)) - for _, m := range sq.Conf.Metrics { + sq.Metrics = make(map[string]Metric, len(sq.conf.Metrics)) + for _, m := range sq.conf.Metrics { if metric := CreateMetric(m); metric != nil { sq.Metrics[m] = metric } @@ -181,9 +181,8 @@ func (s *Stats) UpdateQueues(css []*CdrStats, out *int) error { defer s.mux.Unlock() oldQueues := s.queues s.queues = make(map[string]*StatsQueue, len(css)) - s.queueSavers = make(map[string]*queueSaver) if def, exists := oldQueues[utils.META_DEFAULT]; exists { - def.UpdateConf(def.Conf) // for reset + def.UpdateConf(def.conf) // for reset s.queues[utils.META_DEFAULT] = def } for _, cs := range css { @@ -196,14 +195,23 @@ func (s *Stats) UpdateQueues(css []*CdrStats, out *int) error { } if sq == nil { sq = NewStatsQueue(cs) - } - - si := cs.SaveInterval - if si == 0 { - si = s.defaultSaveInterval - } - if si > 0 { - s.queueSavers[cs.Id] = newQueueSaver(cs.Id, si, sq, s.accountingDb) + // load queue from storage if exists + if saved, err := s.accountingDb.GetCdrStatsQueue(sq.GetId()); err == nil { + sq.Load(saved) + } else { + Logger.Info(err.Error()) + } + // setup queue saver + if s.queueSavers == nil { + s.queueSavers = make(map[string]*queueSaver) + } + si := cs.SaveInterval + if si == 0 { + si = s.defaultSaveInterval + } + if si > 0 { + s.queueSavers[cs.Id] = newQueueSaver(cs.Id, si, sq, s.accountingDb) + } } s.queues[cs.Id] = sq } diff --git a/engine/stats_queue.go b/engine/stats_queue.go index 5c76111a3..4176bd49c 100644 --- a/engine/stats_queue.go +++ b/engine/stats_queue.go @@ -19,6 +19,7 @@ along with this program. If not, see package engine import ( + "log" "strings" "sync" "time" @@ -26,7 +27,7 @@ import ( type StatsQueue struct { Cdrs []*QCdr - Conf *CdrStats + conf *CdrStats Metrics map[string]Metric mux sync.Mutex dirty bool @@ -68,7 +69,7 @@ func NewStatsQueue(conf *CdrStats) *StatsQueue { func (sq *StatsQueue) UpdateConf(conf *CdrStats) { sq.mux.Lock() defer sq.mux.Unlock() - sq.Conf = conf + sq.conf = conf sq.Cdrs = make([]*QCdr, 0) sq.Metrics = make(map[string]Metric, len(conf.Metrics)) sq.dirty = true @@ -79,10 +80,20 @@ func (sq *StatsQueue) UpdateConf(conf *CdrStats) { } } +func (sq *StatsQueue) Load(saved *StatsQueue) { + sq.Cdrs = saved.Cdrs + for key, metric := range saved.Metrics { + if _, exists := sq.Metrics[key]; exists { + sq.Metrics[key] = metric + } + } +} + func (sq *StatsQueue) IsDirty() bool { sq.mux.Lock() defer sq.mux.Unlock() v := sq.dirty + log.Print(v) // take advantage of the locking to set it to flip it sq.dirty = false return v @@ -91,7 +102,7 @@ func (sq *StatsQueue) IsDirty() bool { func (sq *StatsQueue) AppendCDR(cdr *StoredCdr) { sq.mux.Lock() defer sq.mux.Unlock() - if sq.Conf.AcceptCdr(cdr) { + if sq.conf.AcceptCdr(cdr) { qcdr := sq.simplifyCdr(cdr) sq.Cdrs = append(sq.Cdrs, qcdr) sq.addToMetrics(qcdr) @@ -99,8 +110,8 @@ func (sq *StatsQueue) AppendCDR(cdr *StoredCdr) { sq.dirty = true // check for trigger stats := sq.getStats() - sq.Conf.Triggers.Sort() - for _, at := range sq.Conf.Triggers { + sq.conf.Triggers.Sort() + for _, at := range sq.conf.Triggers { if at.MinQueuedItems > 0 && len(sq.Cdrs) < at.MinQueuedItems { continue } @@ -123,14 +134,12 @@ func (sq *StatsQueue) AppendCDR(cdr *StoredCdr) { } func (sq *StatsQueue) addToMetrics(cdr *QCdr) { - sq.dirty = true for _, metric := range sq.Metrics { metric.AddCdr(cdr) } } func (sq *StatsQueue) removeFromMetrics(cdr *QCdr) { - sq.dirty = true for _, metric := range sq.Metrics { metric.RemoveCdr(cdr) } @@ -147,18 +156,18 @@ func (sq *StatsQueue) simplifyCdr(cdr *StoredCdr) *QCdr { } func (sq *StatsQueue) purgeObsoleteCdrs() { - if sq.Conf.QueueLength > 0 { + if sq.conf.QueueLength > 0 { currentLength := len(sq.Cdrs) - if currentLength > sq.Conf.QueueLength { - for _, cdr := range sq.Cdrs[:currentLength-sq.Conf.QueueLength] { + if currentLength > sq.conf.QueueLength { + for _, cdr := range sq.Cdrs[:currentLength-sq.conf.QueueLength] { sq.removeFromMetrics(cdr) } - sq.Cdrs = sq.Cdrs[currentLength-sq.Conf.QueueLength:] + sq.Cdrs = sq.Cdrs[currentLength-sq.conf.QueueLength:] } } - if sq.Conf.TimeWindow > 0 { + if sq.conf.TimeWindow > 0 { for i, cdr := range sq.Cdrs { - if time.Now().Sub(cdr.SetupTime) > sq.Conf.TimeWindow { + if time.Now().Sub(cdr.SetupTime) > sq.conf.TimeWindow { sq.removeFromMetrics(cdr) continue } else { @@ -187,12 +196,12 @@ func (sq *StatsQueue) getStats() map[string]float64 { } func (sq *StatsQueue) GetId() string { - return sq.Conf.Id + return sq.conf.Id } // Convert data into a struct which can be used in actions based on triggers hit func (sq *StatsQueue) Triggered(at *ActionTrigger) *StatsQueueTriggered { - return &StatsQueueTriggered{Id: sq.Conf.Id, Metrics: sq.getStats(), Trigger: at} + return &StatsQueueTriggered{Id: sq.conf.Id, Metrics: sq.getStats(), Trigger: at} } // Struct to be passed to triggered actions diff --git a/engine/stats_test.go b/engine/stats_test.go index af054c129..bdaace3ad 100644 --- a/engine/stats_test.go +++ b/engine/stats_test.go @@ -104,100 +104,100 @@ func TestAcceptCdr(t *testing.T) { MediationRunId: "mri", Cost: 10, } - sq.Conf = &CdrStats{} - if sq.Conf.AcceptCdr(cdr) != true { + sq.conf = &CdrStats{} + if sq.conf.AcceptCdr(cdr) != true { t.Errorf("Should have accepted this CDR: %+v", cdr) } - sq.Conf = &CdrStats{TOR: []string{"test"}} - if sq.Conf.AcceptCdr(cdr) == true { + sq.conf = &CdrStats{TOR: []string{"test"}} + if sq.conf.AcceptCdr(cdr) == true { t.Errorf("Should have NOT accepted this CDR: %+v", cdr) } - sq.Conf = &CdrStats{CdrHost: []string{"test"}} - if sq.Conf.AcceptCdr(cdr) == true { + sq.conf = &CdrStats{CdrHost: []string{"test"}} + if sq.conf.AcceptCdr(cdr) == true { t.Errorf("Should have NOT accepted this CDR: %+v", cdr) } - sq.Conf = &CdrStats{CdrSource: []string{"test"}} - if sq.Conf.AcceptCdr(cdr) == true { + sq.conf = &CdrStats{CdrSource: []string{"test"}} + if sq.conf.AcceptCdr(cdr) == true { t.Errorf("Should have NOT accepted this CDR: %+v", cdr) } - sq.Conf = &CdrStats{Direction: []string{"test"}} - if sq.Conf.AcceptCdr(cdr) == true { + sq.conf = &CdrStats{Direction: []string{"test"}} + if sq.conf.AcceptCdr(cdr) == true { t.Errorf("Should have NOT accepted this CDR: %+v", cdr) } - sq.Conf = &CdrStats{Tenant: []string{"test"}} - if sq.Conf.AcceptCdr(cdr) == true { + sq.conf = &CdrStats{Tenant: []string{"test"}} + if sq.conf.AcceptCdr(cdr) == true { t.Errorf("Should have NOT accepted this CDR: %+v", cdr) } - sq.Conf = &CdrStats{Category: []string{"test"}} - if sq.Conf.AcceptCdr(cdr) == true { + sq.conf = &CdrStats{Category: []string{"test"}} + if sq.conf.AcceptCdr(cdr) == true { t.Errorf("Should have NOT accepted this CDR: %+v", cdr) } - sq.Conf = &CdrStats{Account: []string{"test"}} - if sq.Conf.AcceptCdr(cdr) == true { + sq.conf = &CdrStats{Account: []string{"test"}} + if sq.conf.AcceptCdr(cdr) == true { t.Errorf("Should have NOT accepted this CDR: %+v", cdr) } - sq.Conf = &CdrStats{Subject: []string{"test"}} - if sq.Conf.AcceptCdr(cdr) == true { + sq.conf = &CdrStats{Subject: []string{"test"}} + if sq.conf.AcceptCdr(cdr) == true { t.Errorf("Should have NOT accepted this CDR: %+v", cdr) } - sq.Conf = &CdrStats{Supplier: []string{"test"}} - if sq.Conf.AcceptCdr(cdr) == true { + sq.conf = &CdrStats{Supplier: []string{"test"}} + if sq.conf.AcceptCdr(cdr) == true { t.Errorf("Should have NOT accepted this CDR: %+v", cdr) } - sq.Conf = &CdrStats{DisconnectCause: []string{"test"}} - if sq.Conf.AcceptCdr(cdr) == true { + sq.conf = &CdrStats{DisconnectCause: []string{"test"}} + if sq.conf.AcceptCdr(cdr) == true { t.Errorf("Should have NOT accepted this CDR: %+v", cdr) } - sq.Conf = &CdrStats{RatedAccount: []string{"test"}} - if sq.Conf.AcceptCdr(cdr) == true { + sq.conf = &CdrStats{RatedAccount: []string{"test"}} + if sq.conf.AcceptCdr(cdr) == true { t.Errorf("Should have NOT accepted this CDR: %+v", cdr) } - sq.Conf = &CdrStats{RatedSubject: []string{"test"}} - if sq.Conf.AcceptCdr(cdr) == true { + sq.conf = &CdrStats{RatedSubject: []string{"test"}} + if sq.conf.AcceptCdr(cdr) == true { t.Errorf("Should have NOT accepted this CDR: %+v", cdr) } - sq.Conf = &CdrStats{DestinationPrefix: []string{"test"}} - if sq.Conf.AcceptCdr(cdr) == true { + sq.conf = &CdrStats{DestinationPrefix: []string{"test"}} + if sq.conf.AcceptCdr(cdr) == true { t.Errorf("Should have NOT accepted this CDR: %+v", cdr) } - sq.Conf = &CdrStats{DestinationPrefix: []string{"test", "123"}} - if sq.Conf.AcceptCdr(cdr) != true { + sq.conf = &CdrStats{DestinationPrefix: []string{"test", "123"}} + if sq.conf.AcceptCdr(cdr) != true { t.Errorf("Should have accepted this CDR: %+v", cdr) } - sq.Conf = &CdrStats{SetupInterval: []time.Time{time.Date(2014, 7, 3, 13, 43, 0, 1, time.UTC)}} - if sq.Conf.AcceptCdr(cdr) == true { + sq.conf = &CdrStats{SetupInterval: []time.Time{time.Date(2014, 7, 3, 13, 43, 0, 1, time.UTC)}} + if sq.conf.AcceptCdr(cdr) == true { t.Errorf("Should have NOT accepted this CDR: %+v", cdr) } - sq.Conf = &CdrStats{SetupInterval: []time.Time{time.Date(2014, 7, 3, 13, 42, 0, 0, time.UTC), time.Date(2014, 7, 3, 13, 43, 0, 0, time.UTC)}} - if sq.Conf.AcceptCdr(cdr) == true { + sq.conf = &CdrStats{SetupInterval: []time.Time{time.Date(2014, 7, 3, 13, 42, 0, 0, time.UTC), time.Date(2014, 7, 3, 13, 43, 0, 0, time.UTC)}} + if sq.conf.AcceptCdr(cdr) == true { t.Errorf("Should have NOT accepted this CDR: %+v", cdr) } - sq.Conf = &CdrStats{SetupInterval: []time.Time{time.Date(2014, 7, 3, 13, 42, 0, 0, time.UTC)}} - if sq.Conf.AcceptCdr(cdr) != true { + sq.conf = &CdrStats{SetupInterval: []time.Time{time.Date(2014, 7, 3, 13, 42, 0, 0, time.UTC)}} + if sq.conf.AcceptCdr(cdr) != true { t.Errorf("Should have accepted this CDR: %+v", cdr) } - sq.Conf = &CdrStats{SetupInterval: []time.Time{time.Date(2014, 7, 3, 13, 42, 0, 0, time.UTC), time.Date(2014, 7, 3, 13, 43, 0, 1, time.UTC)}} - if sq.Conf.AcceptCdr(cdr) != true { + sq.conf = &CdrStats{SetupInterval: []time.Time{time.Date(2014, 7, 3, 13, 42, 0, 0, time.UTC), time.Date(2014, 7, 3, 13, 43, 0, 1, time.UTC)}} + if sq.conf.AcceptCdr(cdr) != true { t.Errorf("Should have accepted this CDR: %+v", cdr) } - sq.Conf = &CdrStats{UsageInterval: []time.Duration{11 * time.Second}} - if sq.Conf.AcceptCdr(cdr) == true { + sq.conf = &CdrStats{UsageInterval: []time.Duration{11 * time.Second}} + if sq.conf.AcceptCdr(cdr) == true { t.Errorf("Should have NOT accepted this CDR: %+v", cdr) } - sq.Conf = &CdrStats{UsageInterval: []time.Duration{1 * time.Second, 10 * time.Second}} - if sq.Conf.AcceptCdr(cdr) == true { + sq.conf = &CdrStats{UsageInterval: []time.Duration{1 * time.Second, 10 * time.Second}} + if sq.conf.AcceptCdr(cdr) == true { t.Errorf("Should have NOT accepted this CDR: %+v", cdr) } - sq.Conf = &CdrStats{PddInterval: []time.Duration{8 * time.Second}} - if sq.Conf.AcceptCdr(cdr) == true { + sq.conf = &CdrStats{PddInterval: []time.Duration{8 * time.Second}} + if sq.conf.AcceptCdr(cdr) == true { t.Errorf("Should have NOT accepted this CDR: %+v", cdr) } - sq.Conf = &CdrStats{PddInterval: []time.Duration{3 * time.Second, 7 * time.Second}} - if sq.Conf.AcceptCdr(cdr) == true { + sq.conf = &CdrStats{PddInterval: []time.Duration{3 * time.Second, 7 * time.Second}} + if sq.conf.AcceptCdr(cdr) == true { t.Errorf("Should have NOT accepted this CDR: %+v", cdr) } - sq.Conf = &CdrStats{PddInterval: []time.Duration{3 * time.Second, 8 * time.Second}} - if sq.Conf.AcceptCdr(cdr) != true { + sq.conf = &CdrStats{PddInterval: []time.Duration{3 * time.Second, 8 * time.Second}} + if sq.conf.AcceptCdr(cdr) != true { t.Errorf("Should have accepted this CDR: %+v", cdr) } } @@ -287,7 +287,7 @@ func TestStatsReloadQueues(t *testing.T) { result := len(ids) expected := 2 if result != expected { - t.Errorf("Error loading stats queues. Expected %v was %v", expected, result) + t.Errorf("Error loading stats queues. Expected %v was %v: %v", expected, result, ids) } valMap := make(map[string]float64) if err := cdrStats.GetValues("CDRST2", &valMap); err != nil { @@ -366,6 +366,24 @@ func TestStatsReloadQueuesWithIds(t *testing.T) { } } +func TestStatsSaveQueues(t *testing.T) { + cdrStats := NewStats(ratingStorage, accountingStorage, 0) + cdr := &StoredCdr{ + Tenant: "cgrates.org", + Category: "call", + AnswerTime: time.Now(), + SetupTime: time.Now(), + Usage: 10 * time.Second, + Cost: 10, + } + cdrStats.AppendCDR(cdr, nil) + ids := []string{} + cdrStats.GetQueueIds(0, &ids) + if _, found := cdrStats.queueSavers["CDRST1"]; !found { + t.Error("Error creating queue savers: ", cdrStats.queueSavers) + } +} + func TestStatsResetQueues(t *testing.T) { cdrStats := NewStats(ratingStorage, accountingStorage, 0) cdr := &StoredCdr{