This commit is contained in:
DanB
2015-06-30 10:52:16 +02:00
6 changed files with 79 additions and 56 deletions

View File

@@ -1,6 +1,6 @@
#Id[0],QueueLength[1],TimeWindow[2],SaveInerval[3],Metric[4],SetupInterval[5],TOR[6],CdrHost[7],CdrSource[8],ReqType[9],Direction[10],Tenant[11],Category[12],Account[13],Subject[14],DestinationPrefix[15],PddInterval[16],UsageInterval[17],Supplier[18],DisconnectCause[19],MediationRunIds[20],RatedAccount[21],RatedSubject[22],CostInterval[23],Triggers[24]
CDRST3,5,60m,,ASR,2014-07-29T15:00:00Z;2014-07-29T16:00:00Z,*voice,87.139.12.167,FS_JSON,rated,*out,cgrates.org,call,dan,dan,+49,,5m;10m,,,default,rif,rif,0;2,CDRST3_WARN_ASR
CDRST3,,,,ACD,,,,,,,,,,,,,,,,,,,,CDRST3_WARN_ACD
CDRST3,,,,ACC,,,,,,,,,,,,,,,,,,,,CDRST3_WARN_ACC
CDRST4,10,0,,ASR,,,,,,,cgrates.org,call,,,,,,,,,,,,CDRST4_WARN_ASR
CDRST4,,,,ACD,,,,,,,,,,,,,,,,,,,,CDRST4_WARN_ACD
#Id[0],QueueLength[1],TimeWindow[2],SaveInerval[3],Metric[4],SetupInterval[5],TOR[6],CdrHost[7],CdrSource[8],ReqType[9],Direction[10],Tenant[11],Category[12],Account[13],Subject[14],DestinationPrefix[15],PddInterval[16],UsageInterval[17],Supplier[18],DisconnectCause[19],MediationRunIds[20],RatedAccount[21],RatedSubject[22],CostInterval[23],Triggers[24]
CDRST3,5,60m,,ASR,2014-07-29T15:00:00Z;2014-07-29T16:00:00Z,*voice,87.139.12.167,FS_JSON,rated,*out,cgrates.org,call,dan,dan,+49,,5m;10m,,,default,rif,rif,0;2,CDRST3_WARN_ASR
CDRST3,,,,ACD,,,,,,,,,,,,,,,,,,,,CDRST3_WARN_ACD
CDRST3,,,,ACC,,,,,,,,,,,,,,,,,,,,CDRST3_WARN_ACC
CDRST4,10,0,,ASR,,,,,,,cgrates.org,call,,,,,,,,,,,,CDRST4_WARN_ASR
CDRST4,,,,ACD,,,,,,,,,,,,,,,,,,,,CDRST4_WARN_ACD
1 #Id[0] QueueLength[1] TimeWindow[2] SaveInerval[3] Metric[4] SetupInterval[5] TOR[6] CdrHost[7] CdrSource[8] ReqType[9] Direction[10] Tenant[11] Category[12] Account[13] Subject[14] DestinationPrefix[15] PddInterval[16] UsageInterval[17] Supplier[18] DisconnectCause[19] MediationRunIds[20] RatedAccount[21] RatedSubject[22] CostInterval[23] Triggers[24]
2 CDRST3 5 60m ASR 2014-07-29T15:00:00Z;2014-07-29T16:00:00Z *voice 87.139.12.167 FS_JSON rated *out cgrates.org call dan dan +49 5m;10m default rif rif 0;2 CDRST3_WARN_ASR
3 CDRST3 ACD CDRST3_WARN_ACD
4 CDRST3 ACC CDRST3_WARN_ACC
5 CDRST4 10 0 ASR cgrates.org call CDRST4_WARN_ASR
6 CDRST4 ACD CDRST4_WARN_ACD

View File

@@ -485,7 +485,7 @@ func mailAsync(ub *Account, sq *StatsQueueTriggered, a *Action, acs Actions) err
message = []byte(fmt.Sprintf("To: %s\r\nSubject: [CGR Notification] Threshold hit on Balance: %s\r\n\r\nTime: \r\n\t%s\r\n\r\nBalance:\r\n\t%s\r\n\r\nYours faithfully,\r\nCGR Balance Monitor\r\n", toAddrStr, ub.Id, time.Now(), balJsn))
} else if sq != nil {
message = []byte(fmt.Sprintf("To: %s\r\nSubject: [CGR Notification] Threshold hit on StatsQueueId: %s\r\n\r\nTime: \r\n\t%s\r\n\r\nStatsQueueId:\r\n\t%s\r\n\r\nMetrics:\r\n\t%+v\r\n\r\nTrigger:\r\n\t%+v\r\n\r\nYours faithfully,\r\nCGR CDR Stats Monitor\r\n",
toAddrStr, sq.Id, time.Now(), sq.Id, sq.Metrics, sq.Trigger))
toAddrStr, sq.Id, time.Now(), sq.Id, sq.metrics, sq.Trigger))
}
auth := smtp.PlainAuth("", cgrCfg.MailerAuthUser, cgrCfg.MailerAuthPass, strings.Split(cgrCfg.MailerServer, ":")[0]) // We only need host part, so ignore port
go func() {

View File

@@ -151,10 +151,10 @@ func (s *Stats) ResetQueues(ids []string, out *int) error {
if len(ids) == 0 {
for _, sq := range s.queues {
sq.Cdrs = make([]*QCdr, 0)
sq.Metrics = make(map[string]Metric, len(sq.conf.Metrics))
sq.metrics = make(map[string]Metric, len(sq.conf.Metrics))
for _, m := range sq.conf.Metrics {
if metric := CreateMetric(m); metric != nil {
sq.Metrics[m] = metric
sq.metrics[m] = metric
}
}
}
@@ -166,10 +166,10 @@ func (s *Stats) ResetQueues(ids []string, out *int) error {
continue
}
sq.Cdrs = make([]*QCdr, 0)
sq.Metrics = make(map[string]Metric, len(sq.conf.Metrics))
sq.metrics = make(map[string]Metric, len(sq.conf.Metrics))
for _, m := range sq.conf.Metrics {
if metric := CreateMetric(m); metric != nil {
sq.Metrics[m] = metric
sq.metrics[m] = metric
}
}
}
@@ -200,10 +200,11 @@ func (s *Stats) UpdateQueues(css []*CdrStats, out *int) error {
if sq == nil {
sq = NewStatsQueue(cs)
// load queue from storage if exists
if saved, err := s.accountingDb.GetCdrStatsQueue(sq.GetId()); err == nil {
sq.Load(saved)
} else {
Logger.Info(err.Error())
Logger.Debug(fmt.Sprintf("XXXXXXXXXXX: %v", err))
}
s.setupQueueSaver(sq)
}

View File

@@ -28,7 +28,7 @@ import (
type StatsQueue struct {
Cdrs []*QCdr
conf *CdrStats
Metrics map[string]Metric
metrics map[string]Metric
mux sync.Mutex
dirty bool
}
@@ -59,7 +59,7 @@ type QCdr struct {
func NewStatsQueue(conf *CdrStats) *StatsQueue {
if conf == nil {
return &StatsQueue{Metrics: make(map[string]Metric)}
return &StatsQueue{metrics: make(map[string]Metric)}
}
sq := &StatsQueue{}
sq.UpdateConf(conf)
@@ -76,11 +76,11 @@ func (sq *StatsQueue) UpdateConf(conf *CdrStats) {
}
sq.conf = conf
sq.Cdrs = make([]*QCdr, 0)
sq.Metrics = make(map[string]Metric, len(conf.Metrics))
sq.metrics = make(map[string]Metric, len(conf.Metrics))
sq.dirty = true
for _, m := range conf.Metrics {
if metric := CreateMetric(m); metric != nil {
sq.Metrics[m] = metric
sq.metrics[m] = metric
}
}
}
@@ -88,21 +88,23 @@ func (sq *StatsQueue) UpdateConf(conf *CdrStats) {
func (sq *StatsQueue) Save(adb AccountingStorage) {
sq.mux.Lock()
defer sq.mux.Unlock()
if sq.dirty {
if err := adb.SetCdrStatsQueue(sq); err != nil {
Logger.Err(fmt.Sprintf("Error saving cdr stats queue id %s: %v", sq.GetId(), err))
}
//if sq.dirty {
Logger.Debug(fmt.Sprintf("SAVED: %+v", sq))
if err := adb.SetCdrStatsQueue(sq); err != nil {
Logger.Err(fmt.Sprintf("Error saving cdr stats queue id %s: %v", sq.GetId(), err))
return
}
sq.dirty = false
//}
}
func (sq *StatsQueue) Load(saved *StatsQueue) {
sq.mux.Lock()
defer sq.mux.Unlock()
Logger.Debug(fmt.Sprintf("LOADED: %+v", saved))
sq.Cdrs = saved.Cdrs
for key, metric := range saved.Metrics {
if _, exists := sq.Metrics[key]; exists {
sq.Metrics[key] = metric
}
for _, qcdr := range saved.Cdrs {
sq.appendQcdr(qcdr)
}
}
@@ -110,30 +112,33 @@ func (sq *StatsQueue) AppendCDR(cdr *StoredCdr) {
sq.mux.Lock()
defer sq.mux.Unlock()
if sq.conf.AcceptCdr(cdr) {
qcdr := sq.simplifyCdr(cdr)
sq.Cdrs = append(sq.Cdrs, qcdr)
sq.addToMetrics(qcdr)
sq.purgeObsoleteCdrs()
sq.dirty = true
// check for trigger
stats := sq.getStats()
sq.conf.Triggers.Sort()
for _, at := range sq.conf.Triggers {
if at.MinQueuedItems > 0 && len(sq.Cdrs) < at.MinQueuedItems {
continue
}
if strings.HasPrefix(at.ThresholdType, "*min_") {
if value, ok := stats[METRIC_TRIGGER_MAP[at.ThresholdType]]; ok {
if value > STATS_NA && value <= at.ThresholdValue {
at.Execute(nil, sq.Triggered(at))
}
sq.appendQcdr(sq.simplifyCdr(cdr))
}
}
func (sq *StatsQueue) appendQcdr(qcdr *QCdr) {
sq.Cdrs = append(sq.Cdrs, qcdr)
sq.addToMetrics(qcdr)
sq.purgeObsoleteCdrs()
sq.dirty = true
// check for trigger
stats := sq.getStats()
sq.conf.Triggers.Sort()
for _, at := range sq.conf.Triggers {
if at.MinQueuedItems > 0 && len(sq.Cdrs) < at.MinQueuedItems {
continue
}
if strings.HasPrefix(at.ThresholdType, "*min_") {
if value, ok := stats[METRIC_TRIGGER_MAP[at.ThresholdType]]; ok {
if value > STATS_NA && value <= at.ThresholdValue {
at.Execute(nil, sq.Triggered(at))
}
}
if strings.HasPrefix(at.ThresholdType, "*max_") {
if value, ok := stats[METRIC_TRIGGER_MAP[at.ThresholdType]]; ok {
if value > STATS_NA && value >= at.ThresholdValue {
at.Execute(nil, sq.Triggered(at))
}
}
if strings.HasPrefix(at.ThresholdType, "*max_") {
if value, ok := stats[METRIC_TRIGGER_MAP[at.ThresholdType]]; ok {
if value > STATS_NA && value >= at.ThresholdValue {
at.Execute(nil, sq.Triggered(at))
}
}
}
@@ -141,13 +146,13 @@ func (sq *StatsQueue) AppendCDR(cdr *StoredCdr) {
}
func (sq *StatsQueue) addToMetrics(cdr *QCdr) {
for _, metric := range sq.Metrics {
for _, metric := range sq.metrics {
metric.AddCdr(cdr)
}
}
func (sq *StatsQueue) removeFromMetrics(cdr *QCdr) {
for _, metric := range sq.Metrics {
for _, metric := range sq.metrics {
metric.RemoveCdr(cdr)
}
}
@@ -195,8 +200,8 @@ func (sq *StatsQueue) GetStats() map[string]float64 {
}
func (sq *StatsQueue) getStats() map[string]float64 {
stat := make(map[string]float64, len(sq.Metrics))
for key, metric := range sq.Metrics {
stat := make(map[string]float64, len(sq.metrics))
for key, metric := range sq.metrics {
stat[key] = metric.GetValue()
}
return stat
@@ -208,12 +213,12 @@ func (sq *StatsQueue) GetId() string {
// Convert data into a struct which can be used in actions based on triggers hit
func (sq *StatsQueue) Triggered(at *ActionTrigger) *StatsQueueTriggered {
return &StatsQueueTriggered{Id: sq.conf.Id, Metrics: sq.getStats(), Trigger: at}
return &StatsQueueTriggered{Id: sq.conf.Id, metrics: sq.getStats(), Trigger: at}
}
// Struct to be passed to triggered actions
type StatsQueueTriggered struct {
Id string // StatsQueueId
Metrics map[string]float64
metrics map[string]float64
Trigger *ActionTrigger
}

View File

@@ -27,8 +27,8 @@ import (
func TestStatsQueueInit(t *testing.T) {
sq := NewStatsQueue(&CdrStats{Metrics: []string{ASR, ACC}})
if len(sq.Metrics) != 2 {
t.Error("Expected 2 metrics got ", len(sq.Metrics))
if len(sq.metrics) != 2 {
t.Error("Expected 2 metrics got ", len(sq.metrics))
}
}
@@ -447,3 +447,20 @@ func TestStatsResetQueuesWithIds(t *testing.T) {
t.Error("Error on metric map: ", valMap)
}
}
func TestStatsSaveRestoreQeue(t *testing.T) {
sq := &StatsQueue{
conf: &CdrStats{Id: "TTT"},
Cdrs: []*QCdr{&QCdr{Cost: 9.0}},
}
if err := accountingStorage.SetCdrStatsQueue(sq); err != nil {
t.Error("Error saving metric: ", err)
}
recovered, err := accountingStorage.GetCdrStatsQueue(sq.GetId())
if err != nil {
t.Error("Error loading metric: ", err)
}
if len(recovered.Cdrs) != 1 || recovered.Cdrs[0].Cost != sq.Cdrs[0].Cost {
t.Errorf("Expecting %+v got: %+v", sq.Cdrs[0], recovered.Cdrs[0])
}
}

View File

@@ -677,8 +677,8 @@ func (rs *RedisStorage) SetAccount(ub *Account) (err error) {
func (rs *RedisStorage) GetCdrStatsQueue(key string) (sq *StatsQueue, err error) {
var values []byte
if values, err = rs.db.Get(utils.CDR_STATS_QUEUE_PREFIX + key); err == nil {
sq = &StatsQueue{Metrics: make(map[string]Metric)}
err = rs.ms.Unmarshal(values, sq)
sq = &StatsQueue{}
err = rs.ms.Unmarshal(values, &sq)
}
return
}