mirror of
https://github.com/cgrates/cgrates.git
synced 2026-02-11 18:16:24 +05:00
@@ -19,6 +19,7 @@ along with this program. If not, see <http://www.gnu.org/licenses/>
|
||||
package engine
|
||||
|
||||
import (
|
||||
"reflect"
|
||||
"strings"
|
||||
"time"
|
||||
|
||||
@@ -173,3 +174,36 @@ func (cs *CdrStats) AcceptCdr(cdr *StoredCdr) bool {
|
||||
}
|
||||
return true
|
||||
}
|
||||
|
||||
func (cs *CdrStats) hasGeneralConfigs() bool {
|
||||
return cs.QueueLength == 0 &&
|
||||
cs.TimeWindow == 0 &&
|
||||
cs.SaveInterval == 0 &&
|
||||
len(cs.Metrics) == 0
|
||||
}
|
||||
|
||||
func (cs *CdrStats) equalExceptTriggers(other *CdrStats) bool {
|
||||
return cs.QueueLength == other.QueueLength &&
|
||||
cs.TimeWindow == other.TimeWindow &&
|
||||
cs.SaveInterval == other.SaveInterval &&
|
||||
reflect.DeepEqual(cs.Metrics, other.Metrics) &&
|
||||
reflect.DeepEqual(cs.SetupInterval, other.SetupInterval) &&
|
||||
reflect.DeepEqual(cs.TOR, other.TOR) &&
|
||||
reflect.DeepEqual(cs.CdrHost, other.CdrHost) &&
|
||||
reflect.DeepEqual(cs.CdrSource, other.CdrSource) &&
|
||||
reflect.DeepEqual(cs.ReqType, other.ReqType) &&
|
||||
reflect.DeepEqual(cs.Direction, other.Direction) &&
|
||||
reflect.DeepEqual(cs.Tenant, other.Tenant) &&
|
||||
reflect.DeepEqual(cs.Category, other.Category) &&
|
||||
reflect.DeepEqual(cs.Account, other.Account) &&
|
||||
reflect.DeepEqual(cs.Subject, other.Subject) &&
|
||||
reflect.DeepEqual(cs.DestinationPrefix, other.DestinationPrefix) &&
|
||||
reflect.DeepEqual(cs.UsageInterval, other.UsageInterval) &&
|
||||
reflect.DeepEqual(cs.PddInterval, other.PddInterval) &&
|
||||
reflect.DeepEqual(cs.Supplier, other.Supplier) &&
|
||||
reflect.DeepEqual(cs.DisconnectCause, other.DisconnectCause) &&
|
||||
reflect.DeepEqual(cs.MediationRunIds, other.MediationRunIds) &&
|
||||
reflect.DeepEqual(cs.RatedAccount, other.RatedAccount) &&
|
||||
reflect.DeepEqual(cs.RatedSubject, other.RatedSubject) &&
|
||||
reflect.DeepEqual(cs.CostInterval, other.CostInterval)
|
||||
}
|
||||
|
||||
@@ -121,6 +121,7 @@ func (s *Stats) AddQueue(cs *CdrStats, out *int) error {
|
||||
sq.UpdateConf(cs)
|
||||
} else {
|
||||
s.queues[cs.Id] = NewStatsQueue(cs)
|
||||
s.setupQueueSaver(sq)
|
||||
}
|
||||
return nil
|
||||
}
|
||||
@@ -180,10 +181,14 @@ func (s *Stats) UpdateQueues(css []*CdrStats, out *int) error {
|
||||
s.mux.Lock()
|
||||
defer s.mux.Unlock()
|
||||
oldQueues := s.queues
|
||||
oldSavers := s.queueSavers
|
||||
s.queues = make(map[string]*StatsQueue, len(css))
|
||||
s.queueSavers = make(map[string]*queueSaver, len(css))
|
||||
if def, exists := oldQueues[utils.META_DEFAULT]; exists {
|
||||
def.UpdateConf(def.conf) // for reset
|
||||
s.queues[utils.META_DEFAULT] = def
|
||||
s.queueSavers[utils.META_DEFAULT] = oldSavers[utils.META_DEFAULT]
|
||||
delete(oldSavers, utils.META_DEFAULT)
|
||||
}
|
||||
for _, cs := range css {
|
||||
var sq *StatsQueue
|
||||
@@ -191,6 +196,8 @@ func (s *Stats) UpdateQueues(css []*CdrStats, out *int) error {
|
||||
if oldQueues != nil {
|
||||
if sq, existing = oldQueues[cs.Id]; existing {
|
||||
sq.UpdateConf(cs)
|
||||
s.queueSavers[cs.Id] = oldSavers[cs.Id]
|
||||
delete(oldSavers, cs.Id)
|
||||
}
|
||||
}
|
||||
if sq == nil {
|
||||
@@ -201,23 +208,37 @@ func (s *Stats) UpdateQueues(css []*CdrStats, out *int) error {
|
||||
} else {
|
||||
Logger.Info(err.Error())
|
||||
}
|
||||
// setup queue saver
|
||||
if s.queueSavers == nil {
|
||||
s.queueSavers = make(map[string]*queueSaver)
|
||||
}
|
||||
si := cs.SaveInterval
|
||||
if si == 0 {
|
||||
si = s.defaultSaveInterval
|
||||
}
|
||||
if si > 0 {
|
||||
s.queueSavers[cs.Id] = newQueueSaver(cs.Id, si, sq, s.accountingDb)
|
||||
}
|
||||
s.setupQueueSaver(sq)
|
||||
}
|
||||
s.queues[cs.Id] = sq
|
||||
}
|
||||
// stop obsolete savers
|
||||
for _, saver := range oldSavers {
|
||||
saver.stop()
|
||||
}
|
||||
return nil
|
||||
}
|
||||
|
||||
func (s *Stats) setupQueueSaver(sq *StatsQueue) {
|
||||
if sq == nil {
|
||||
return
|
||||
}
|
||||
// setup queue saver
|
||||
if s.queueSavers == nil {
|
||||
s.queueSavers = make(map[string]*queueSaver)
|
||||
}
|
||||
var si time.Duration
|
||||
if sq.conf != nil {
|
||||
si = sq.conf.SaveInterval
|
||||
}
|
||||
if si == 0 {
|
||||
si = s.defaultSaveInterval
|
||||
}
|
||||
if si > 0 {
|
||||
s.queueSavers[sq.GetId()] = newQueueSaver(sq.GetId(), si, sq, s.accountingDb)
|
||||
}
|
||||
}
|
||||
|
||||
func (s *Stats) AppendCDR(cdr *StoredCdr, out *int) error {
|
||||
s.mux.RLock()
|
||||
defer s.mux.RUnlock()
|
||||
|
||||
@@ -69,6 +69,11 @@ func NewStatsQueue(conf *CdrStats) *StatsQueue {
|
||||
func (sq *StatsQueue) UpdateConf(conf *CdrStats) {
|
||||
sq.mux.Lock()
|
||||
defer sq.mux.Unlock()
|
||||
// check if new conf asks for action trigger reset only
|
||||
if sq.conf != nil && (!conf.hasGeneralConfigs() || sq.conf.equalExceptTriggers(conf)) {
|
||||
sq.conf.Triggers = conf.Triggers
|
||||
return
|
||||
}
|
||||
sq.conf = conf
|
||||
sq.Cdrs = make([]*QCdr, 0)
|
||||
sq.Metrics = make(map[string]Metric, len(conf.Metrics))
|
||||
|
||||
@@ -293,7 +293,7 @@ func TestStatsReloadQueues(t *testing.T) {
|
||||
if err := cdrStats.GetValues("CDRST2", &valMap); err != nil {
|
||||
t.Error("Error getting metric values: ", err)
|
||||
}
|
||||
if len(valMap) != 2 || valMap["ACD"] != STATS_NA || valMap["ASR"] != STATS_NA {
|
||||
if len(valMap) != 2 || valMap["ACD"] != 10 || valMap["ASR"] != 100 {
|
||||
t.Error("Error on metric map: ", valMap)
|
||||
}
|
||||
}
|
||||
@@ -329,7 +329,7 @@ func TestStatsReloadQueuesWithDefault(t *testing.T) {
|
||||
if err := cdrStats.GetValues("CDRST2", &valMap); err != nil {
|
||||
t.Error("Error getting metric values: ", err)
|
||||
}
|
||||
if len(valMap) != 2 || valMap["ACD"] != STATS_NA || valMap["ASR"] != STATS_NA {
|
||||
if len(valMap) != 2 || valMap["ACD"] != 10 || valMap["ASR"] != 100 {
|
||||
t.Error("Error on metric map: ", valMap)
|
||||
}
|
||||
}
|
||||
|
||||
Reference in New Issue
Block a user