diff --git a/data/tariffplans/cdrstats/CdrStats.csv b/data/tariffplans/cdrstats/CdrStats.csv index 1ecb74199..bf60b63b4 100644 --- a/data/tariffplans/cdrstats/CdrStats.csv +++ b/data/tariffplans/cdrstats/CdrStats.csv @@ -1,6 +1,6 @@ -#Id[0],QueueLength[1],TimeWindow[2],SaveInerval[3],Metric[4],SetupInterval[5],TOR[6],CdrHost[7],CdrSource[8],ReqType[9],Direction[10],Tenant[11],Category[12],Account[13],Subject[14],DestinationPrefix[15],PddInterval[16],UsageInterval[17],Supplier[18],DisconnectCause[19],MediationRunIds[20],RatedAccount[21],RatedSubject[22],CostInterval[23],Triggers[24] -CDRST3,5,60m,,ASR,2014-07-29T15:00:00Z;2014-07-29T16:00:00Z,*voice,87.139.12.167,FS_JSON,rated,*out,cgrates.org,call,dan,dan,+49,,5m;10m,,,default,rif,rif,0;2,CDRST3_WARN_ASR -CDRST3,,,,ACD,,,,,,,,,,,,,,,,,,,,CDRST3_WARN_ACD -CDRST3,,,,ACC,,,,,,,,,,,,,,,,,,,,CDRST3_WARN_ACC -CDRST4,10,0,,ASR,,,,,,,cgrates.org,call,,,,,,,,,,,,CDRST4_WARN_ASR -CDRST4,,,,ACD,,,,,,,,,,,,,,,,,,,,CDRST4_WARN_ACD +#Id[0],QueueLength[1],TimeWindow[2],SaveInerval[3],Metric[4],SetupInterval[5],TOR[6],CdrHost[7],CdrSource[8],ReqType[9],Direction[10],Tenant[11],Category[12],Account[13],Subject[14],DestinationPrefix[15],PddInterval[16],UsageInterval[17],Supplier[18],DisconnectCause[19],MediationRunIds[20],RatedAccount[21],RatedSubject[22],CostInterval[23],Triggers[24] +CDRST3,5,60m,,ASR,2014-07-29T15:00:00Z;2014-07-29T16:00:00Z,*voice,87.139.12.167,FS_JSON,rated,*out,cgrates.org,call,dan,dan,+49,,5m;10m,,,default,rif,rif,0;2,CDRST3_WARN_ASR +CDRST3,,,,ACD,,,,,,,,,,,,,,,,,,,,CDRST3_WARN_ACD +CDRST3,,,,ACC,,,,,,,,,,,,,,,,,,,,CDRST3_WARN_ACC +CDRST4,10,0,,ASR,,,,,,,cgrates.org,call,,,,,,,,,,,,CDRST4_WARN_ASR +CDRST4,,,,ACD,,,,,,,,,,,,,,,,,,,,CDRST4_WARN_ACD diff --git a/engine/action.go b/engine/action.go index 290f2656d..fb8beeec1 100644 --- a/engine/action.go +++ b/engine/action.go @@ -485,7 +485,7 @@ func mailAsync(ub *Account, sq *StatsQueueTriggered, a *Action, acs Actions) err message = []byte(fmt.Sprintf("To: %s\r\nSubject: [CGR Notification] Threshold hit on Balance: %s\r\n\r\nTime: \r\n\t%s\r\n\r\nBalance:\r\n\t%s\r\n\r\nYours faithfully,\r\nCGR Balance Monitor\r\n", toAddrStr, ub.Id, time.Now(), balJsn)) } else if sq != nil { message = []byte(fmt.Sprintf("To: %s\r\nSubject: [CGR Notification] Threshold hit on StatsQueueId: %s\r\n\r\nTime: \r\n\t%s\r\n\r\nStatsQueueId:\r\n\t%s\r\n\r\nMetrics:\r\n\t%+v\r\n\r\nTrigger:\r\n\t%+v\r\n\r\nYours faithfully,\r\nCGR CDR Stats Monitor\r\n", - toAddrStr, sq.Id, time.Now(), sq.Id, sq.Metrics, sq.Trigger)) + toAddrStr, sq.Id, time.Now(), sq.Id, sq.metrics, sq.Trigger)) } auth := smtp.PlainAuth("", cgrCfg.MailerAuthUser, cgrCfg.MailerAuthPass, strings.Split(cgrCfg.MailerServer, ":")[0]) // We only need host part, so ignore port go func() { diff --git a/engine/stats.go b/engine/stats.go index 2520c228e..c8270033f 100644 --- a/engine/stats.go +++ b/engine/stats.go @@ -151,10 +151,10 @@ func (s *Stats) ResetQueues(ids []string, out *int) error { if len(ids) == 0 { for _, sq := range s.queues { sq.Cdrs = make([]*QCdr, 0) - sq.Metrics = make(map[string]Metric, len(sq.conf.Metrics)) + sq.metrics = make(map[string]Metric, len(sq.conf.Metrics)) for _, m := range sq.conf.Metrics { if metric := CreateMetric(m); metric != nil { - sq.Metrics[m] = metric + sq.metrics[m] = metric } } } @@ -166,10 +166,10 @@ func (s *Stats) ResetQueues(ids []string, out *int) error { continue } sq.Cdrs = make([]*QCdr, 0) - sq.Metrics = make(map[string]Metric, len(sq.conf.Metrics)) + sq.metrics = make(map[string]Metric, len(sq.conf.Metrics)) for _, m := range sq.conf.Metrics { if metric := CreateMetric(m); metric != nil { - sq.Metrics[m] = metric + sq.metrics[m] = metric } } } @@ -200,10 +200,11 @@ func (s *Stats) UpdateQueues(css []*CdrStats, out *int) error { if sq == nil { sq = NewStatsQueue(cs) // load queue from storage if exists + if saved, err := s.accountingDb.GetCdrStatsQueue(sq.GetId()); err == nil { sq.Load(saved) } else { - Logger.Info(err.Error()) + Logger.Debug(fmt.Sprintf("XXXXXXXXXXX: %v", err)) } s.setupQueueSaver(sq) } diff --git a/engine/stats_queue.go b/engine/stats_queue.go index 048b82cec..3b79263f8 100644 --- a/engine/stats_queue.go +++ b/engine/stats_queue.go @@ -28,7 +28,7 @@ import ( type StatsQueue struct { Cdrs []*QCdr conf *CdrStats - Metrics map[string]Metric + metrics map[string]Metric mux sync.Mutex dirty bool } @@ -59,7 +59,7 @@ type QCdr struct { func NewStatsQueue(conf *CdrStats) *StatsQueue { if conf == nil { - return &StatsQueue{Metrics: make(map[string]Metric)} + return &StatsQueue{metrics: make(map[string]Metric)} } sq := &StatsQueue{} sq.UpdateConf(conf) @@ -76,11 +76,11 @@ func (sq *StatsQueue) UpdateConf(conf *CdrStats) { } sq.conf = conf sq.Cdrs = make([]*QCdr, 0) - sq.Metrics = make(map[string]Metric, len(conf.Metrics)) + sq.metrics = make(map[string]Metric, len(conf.Metrics)) sq.dirty = true for _, m := range conf.Metrics { if metric := CreateMetric(m); metric != nil { - sq.Metrics[m] = metric + sq.metrics[m] = metric } } } @@ -88,21 +88,23 @@ func (sq *StatsQueue) UpdateConf(conf *CdrStats) { func (sq *StatsQueue) Save(adb AccountingStorage) { sq.mux.Lock() defer sq.mux.Unlock() - if sq.dirty { - if err := adb.SetCdrStatsQueue(sq); err != nil { - Logger.Err(fmt.Sprintf("Error saving cdr stats queue id %s: %v", sq.GetId(), err)) - } + //if sq.dirty { + Logger.Debug(fmt.Sprintf("SAVED: %+v", sq)) + if err := adb.SetCdrStatsQueue(sq); err != nil { + Logger.Err(fmt.Sprintf("Error saving cdr stats queue id %s: %v", sq.GetId(), err)) + return } + sq.dirty = false + //} } func (sq *StatsQueue) Load(saved *StatsQueue) { sq.mux.Lock() defer sq.mux.Unlock() + Logger.Debug(fmt.Sprintf("LOADED: %+v", saved)) sq.Cdrs = saved.Cdrs - for key, metric := range saved.Metrics { - if _, exists := sq.Metrics[key]; exists { - sq.Metrics[key] = metric - } + for _, qcdr := range saved.Cdrs { + sq.appendQcdr(qcdr) } } @@ -110,30 +112,33 @@ func (sq *StatsQueue) AppendCDR(cdr *StoredCdr) { sq.mux.Lock() defer sq.mux.Unlock() if sq.conf.AcceptCdr(cdr) { - qcdr := sq.simplifyCdr(cdr) - sq.Cdrs = append(sq.Cdrs, qcdr) - sq.addToMetrics(qcdr) - sq.purgeObsoleteCdrs() - sq.dirty = true - // check for trigger - stats := sq.getStats() - sq.conf.Triggers.Sort() - for _, at := range sq.conf.Triggers { - if at.MinQueuedItems > 0 && len(sq.Cdrs) < at.MinQueuedItems { - continue - } - if strings.HasPrefix(at.ThresholdType, "*min_") { - if value, ok := stats[METRIC_TRIGGER_MAP[at.ThresholdType]]; ok { - if value > STATS_NA && value <= at.ThresholdValue { - at.Execute(nil, sq.Triggered(at)) - } + sq.appendQcdr(sq.simplifyCdr(cdr)) + } +} + +func (sq *StatsQueue) appendQcdr(qcdr *QCdr) { + sq.Cdrs = append(sq.Cdrs, qcdr) + sq.addToMetrics(qcdr) + sq.purgeObsoleteCdrs() + sq.dirty = true + // check for trigger + stats := sq.getStats() + sq.conf.Triggers.Sort() + for _, at := range sq.conf.Triggers { + if at.MinQueuedItems > 0 && len(sq.Cdrs) < at.MinQueuedItems { + continue + } + if strings.HasPrefix(at.ThresholdType, "*min_") { + if value, ok := stats[METRIC_TRIGGER_MAP[at.ThresholdType]]; ok { + if value > STATS_NA && value <= at.ThresholdValue { + at.Execute(nil, sq.Triggered(at)) } } - if strings.HasPrefix(at.ThresholdType, "*max_") { - if value, ok := stats[METRIC_TRIGGER_MAP[at.ThresholdType]]; ok { - if value > STATS_NA && value >= at.ThresholdValue { - at.Execute(nil, sq.Triggered(at)) - } + } + if strings.HasPrefix(at.ThresholdType, "*max_") { + if value, ok := stats[METRIC_TRIGGER_MAP[at.ThresholdType]]; ok { + if value > STATS_NA && value >= at.ThresholdValue { + at.Execute(nil, sq.Triggered(at)) } } } @@ -141,13 +146,13 @@ func (sq *StatsQueue) AppendCDR(cdr *StoredCdr) { } func (sq *StatsQueue) addToMetrics(cdr *QCdr) { - for _, metric := range sq.Metrics { + for _, metric := range sq.metrics { metric.AddCdr(cdr) } } func (sq *StatsQueue) removeFromMetrics(cdr *QCdr) { - for _, metric := range sq.Metrics { + for _, metric := range sq.metrics { metric.RemoveCdr(cdr) } } @@ -195,8 +200,8 @@ func (sq *StatsQueue) GetStats() map[string]float64 { } func (sq *StatsQueue) getStats() map[string]float64 { - stat := make(map[string]float64, len(sq.Metrics)) - for key, metric := range sq.Metrics { + stat := make(map[string]float64, len(sq.metrics)) + for key, metric := range sq.metrics { stat[key] = metric.GetValue() } return stat @@ -208,12 +213,12 @@ func (sq *StatsQueue) GetId() string { // Convert data into a struct which can be used in actions based on triggers hit func (sq *StatsQueue) Triggered(at *ActionTrigger) *StatsQueueTriggered { - return &StatsQueueTriggered{Id: sq.conf.Id, Metrics: sq.getStats(), Trigger: at} + return &StatsQueueTriggered{Id: sq.conf.Id, metrics: sq.getStats(), Trigger: at} } // Struct to be passed to triggered actions type StatsQueueTriggered struct { Id string // StatsQueueId - Metrics map[string]float64 + metrics map[string]float64 Trigger *ActionTrigger } diff --git a/engine/stats_test.go b/engine/stats_test.go index 6745eaf95..5b8f705d9 100644 --- a/engine/stats_test.go +++ b/engine/stats_test.go @@ -27,8 +27,8 @@ import ( func TestStatsQueueInit(t *testing.T) { sq := NewStatsQueue(&CdrStats{Metrics: []string{ASR, ACC}}) - if len(sq.Metrics) != 2 { - t.Error("Expected 2 metrics got ", len(sq.Metrics)) + if len(sq.metrics) != 2 { + t.Error("Expected 2 metrics got ", len(sq.metrics)) } } @@ -447,3 +447,20 @@ func TestStatsResetQueuesWithIds(t *testing.T) { t.Error("Error on metric map: ", valMap) } } + +func TestStatsSaveRestoreQeue(t *testing.T) { + sq := &StatsQueue{ + conf: &CdrStats{Id: "TTT"}, + Cdrs: []*QCdr{&QCdr{Cost: 9.0}}, + } + if err := accountingStorage.SetCdrStatsQueue(sq); err != nil { + t.Error("Error saving metric: ", err) + } + recovered, err := accountingStorage.GetCdrStatsQueue(sq.GetId()) + if err != nil { + t.Error("Error loading metric: ", err) + } + if len(recovered.Cdrs) != 1 || recovered.Cdrs[0].Cost != sq.Cdrs[0].Cost { + t.Errorf("Expecting %+v got: %+v", sq.Cdrs[0], recovered.Cdrs[0]) + } +} diff --git a/engine/storage_redis.go b/engine/storage_redis.go index 131c70280..41bb26b74 100644 --- a/engine/storage_redis.go +++ b/engine/storage_redis.go @@ -677,8 +677,8 @@ func (rs *RedisStorage) SetAccount(ub *Account) (err error) { func (rs *RedisStorage) GetCdrStatsQueue(key string) (sq *StatsQueue, err error) { var values []byte if values, err = rs.db.Get(utils.CDR_STATS_QUEUE_PREFIX + key); err == nil { - sq = &StatsQueue{Metrics: make(map[string]Metric)} - err = rs.ms.Unmarshal(values, sq) + sq = &StatsQueue{} + err = rs.ms.Unmarshal(values, &sq) } return }