diff --git a/engine/cdrstats.go b/engine/cdrstats.go index b5076eda7..4a1ee931a 100644 --- a/engine/cdrstats.go +++ b/engine/cdrstats.go @@ -19,6 +19,7 @@ along with this program. If not, see package engine import ( + "reflect" "strings" "time" @@ -173,3 +174,36 @@ func (cs *CdrStats) AcceptCdr(cdr *StoredCdr) bool { } return true } + +func (cs *CdrStats) hasGeneralConfigs() bool { + return cs.QueueLength == 0 && + cs.TimeWindow == 0 && + cs.SaveInterval == 0 && + len(cs.Metrics) == 0 +} + +func (cs *CdrStats) equalExceptTriggers(other *CdrStats) bool { + return cs.QueueLength == other.QueueLength && + cs.TimeWindow == other.TimeWindow && + cs.SaveInterval == other.SaveInterval && + reflect.DeepEqual(cs.Metrics, other.Metrics) && + reflect.DeepEqual(cs.SetupInterval, other.SetupInterval) && + reflect.DeepEqual(cs.TOR, other.TOR) && + reflect.DeepEqual(cs.CdrHost, other.CdrHost) && + reflect.DeepEqual(cs.CdrSource, other.CdrSource) && + reflect.DeepEqual(cs.ReqType, other.ReqType) && + reflect.DeepEqual(cs.Direction, other.Direction) && + reflect.DeepEqual(cs.Tenant, other.Tenant) && + reflect.DeepEqual(cs.Category, other.Category) && + reflect.DeepEqual(cs.Account, other.Account) && + reflect.DeepEqual(cs.Subject, other.Subject) && + reflect.DeepEqual(cs.DestinationPrefix, other.DestinationPrefix) && + reflect.DeepEqual(cs.UsageInterval, other.UsageInterval) && + reflect.DeepEqual(cs.PddInterval, other.PddInterval) && + reflect.DeepEqual(cs.Supplier, other.Supplier) && + reflect.DeepEqual(cs.DisconnectCause, other.DisconnectCause) && + reflect.DeepEqual(cs.MediationRunIds, other.MediationRunIds) && + reflect.DeepEqual(cs.RatedAccount, other.RatedAccount) && + reflect.DeepEqual(cs.RatedSubject, other.RatedSubject) && + reflect.DeepEqual(cs.CostInterval, other.CostInterval) +} diff --git a/engine/stats.go b/engine/stats.go index 1c83b775d..7d494965b 100644 --- a/engine/stats.go +++ b/engine/stats.go @@ -121,6 +121,7 @@ func (s *Stats) AddQueue(cs *CdrStats, out *int) error { sq.UpdateConf(cs) } else { s.queues[cs.Id] = NewStatsQueue(cs) + s.setupQueueSaver(sq) } return nil } @@ -180,10 +181,14 @@ func (s *Stats) UpdateQueues(css []*CdrStats, out *int) error { s.mux.Lock() defer s.mux.Unlock() oldQueues := s.queues + oldSavers := s.queueSavers s.queues = make(map[string]*StatsQueue, len(css)) + s.queueSavers = make(map[string]*queueSaver, len(css)) if def, exists := oldQueues[utils.META_DEFAULT]; exists { def.UpdateConf(def.conf) // for reset s.queues[utils.META_DEFAULT] = def + s.queueSavers[utils.META_DEFAULT] = oldSavers[utils.META_DEFAULT] + delete(oldSavers, utils.META_DEFAULT) } for _, cs := range css { var sq *StatsQueue @@ -191,6 +196,8 @@ func (s *Stats) UpdateQueues(css []*CdrStats, out *int) error { if oldQueues != nil { if sq, existing = oldQueues[cs.Id]; existing { sq.UpdateConf(cs) + s.queueSavers[cs.Id] = oldSavers[cs.Id] + delete(oldSavers, cs.Id) } } if sq == nil { @@ -201,23 +208,37 @@ func (s *Stats) UpdateQueues(css []*CdrStats, out *int) error { } else { Logger.Info(err.Error()) } - // setup queue saver - if s.queueSavers == nil { - s.queueSavers = make(map[string]*queueSaver) - } - si := cs.SaveInterval - if si == 0 { - si = s.defaultSaveInterval - } - if si > 0 { - s.queueSavers[cs.Id] = newQueueSaver(cs.Id, si, sq, s.accountingDb) - } + s.setupQueueSaver(sq) } s.queues[cs.Id] = sq } + // stop obsolete savers + for _, saver := range oldSavers { + saver.stop() + } return nil } +func (s *Stats) setupQueueSaver(sq *StatsQueue) { + if sq == nil { + return + } + // setup queue saver + if s.queueSavers == nil { + s.queueSavers = make(map[string]*queueSaver) + } + var si time.Duration + if sq.conf != nil { + si = sq.conf.SaveInterval + } + if si == 0 { + si = s.defaultSaveInterval + } + if si > 0 { + s.queueSavers[sq.GetId()] = newQueueSaver(sq.GetId(), si, sq, s.accountingDb) + } +} + func (s *Stats) AppendCDR(cdr *StoredCdr, out *int) error { s.mux.RLock() defer s.mux.RUnlock() diff --git a/engine/stats_queue.go b/engine/stats_queue.go index 4176bd49c..98c482336 100644 --- a/engine/stats_queue.go +++ b/engine/stats_queue.go @@ -69,6 +69,11 @@ func NewStatsQueue(conf *CdrStats) *StatsQueue { func (sq *StatsQueue) UpdateConf(conf *CdrStats) { sq.mux.Lock() defer sq.mux.Unlock() + // check if new conf asks for action trigger reset only + if sq.conf != nil && (!conf.hasGeneralConfigs() || sq.conf.equalExceptTriggers(conf)) { + sq.conf.Triggers = conf.Triggers + return + } sq.conf = conf sq.Cdrs = make([]*QCdr, 0) sq.Metrics = make(map[string]Metric, len(conf.Metrics)) diff --git a/engine/stats_test.go b/engine/stats_test.go index bdaace3ad..68efe8c64 100644 --- a/engine/stats_test.go +++ b/engine/stats_test.go @@ -293,7 +293,7 @@ func TestStatsReloadQueues(t *testing.T) { if err := cdrStats.GetValues("CDRST2", &valMap); err != nil { t.Error("Error getting metric values: ", err) } - if len(valMap) != 2 || valMap["ACD"] != STATS_NA || valMap["ASR"] != STATS_NA { + if len(valMap) != 2 || valMap["ACD"] != 10 || valMap["ASR"] != 100 { t.Error("Error on metric map: ", valMap) } } @@ -329,7 +329,7 @@ func TestStatsReloadQueuesWithDefault(t *testing.T) { if err := cdrStats.GetValues("CDRST2", &valMap); err != nil { t.Error("Error getting metric values: ", err) } - if len(valMap) != 2 || valMap["ACD"] != STATS_NA || valMap["ASR"] != STATS_NA { + if len(valMap) != 2 || valMap["ACD"] != 10 || valMap["ASR"] != 100 { t.Error("Error on metric map: ", valMap) } }