started stats triggers

This commit is contained in:
Radu Ioan Fericean
2014-07-15 15:43:33 +03:00
parent 77da8b0daf
commit 26fac17b33
10 changed files with 227 additions and 134 deletions

View File

@@ -55,43 +55,43 @@ func SetCgrConfig(cfg *CGRConfig) {
// Holds system configuration, defaults are overwritten with values from config file if found
type CGRConfig struct {
RatingDBType string
RatingDBHost string // The host to connect to. Values that start with / are for UNIX domain sockets.
RatingDBPort string // The port to bind to.
RatingDBName string // The name of the database to connect to.
RatingDBUser string // The user to sign in as.
RatingDBPass string // The user's password.
AccountDBType string
AccountDBHost string // The host to connect to. Values that start with / are for UNIX domain sockets.
AccountDBPort string // The port to bind to.
AccountDBName string // The name of the database to connect to.
AccountDBUser string // The user to sign in as.
AccountDBPass string // The user's password.
StorDBType string // Should reflect the database type used to store logs
StorDBHost string // The host to connect to. Values that start with / are for UNIX domain sockets.
StorDBPort string // Th e port to bind to.
StorDBName string // The name of the database to connect to.
StorDBUser string // The user to sign in as.
StorDBPass string // The user's password.
DBDataEncoding string // The encoding used to store object data in strings: <msgpack|json>
RPCJSONListen string // RPC JSON listening address
RPCGOBListen string // RPC GOB listening address
HTTPListen string // HTTP listening address
DefaultReqType string // Use this request type if not defined on top
DefaultCategory string // set default type of record
DefaultTenant string // set default tenant
DefaultSubject string // set default rating subject, useful in case of fallback
RoundingDecimals int // Number of decimals to round end prices at
HttpSkipTlsVerify bool // If enabled Http Client will accept any TLS certificate
XmlCfgDocument *CgrXmlCfgDocument // Load additional configuration inside xml document
RaterEnabled bool // start standalone server (no balancer)
RaterBalancer string // balancer address host:port
BalancerEnabled bool
SchedulerEnabled bool
CDRSEnabled bool // Enable CDR Server service
CDRSExtraFields []*utils.RSRField // Extra fields to store in CDRs
CDRSMediator string // Address where to reach the Mediator. Empty for disabling mediation. <""|internal>
CdrStatsConfigs []*CdrStatsConfig // Active cdr stats configuration instances
RatingDBType string
RatingDBHost string // The host to connect to. Values that start with / are for UNIX domain sockets.
RatingDBPort string // The port to bind to.
RatingDBName string // The name of the database to connect to.
RatingDBUser string // The user to sign in as.
RatingDBPass string // The user's password.
AccountDBType string
AccountDBHost string // The host to connect to. Values that start with / are for UNIX domain sockets.
AccountDBPort string // The port to bind to.
AccountDBName string // The name of the database to connect to.
AccountDBUser string // The user to sign in as.
AccountDBPass string // The user's password.
StorDBType string // Should reflect the database type used to store logs
StorDBHost string // The host to connect to. Values that start with / are for UNIX domain sockets.
StorDBPort string // Th e port to bind to.
StorDBName string // The name of the database to connect to.
StorDBUser string // The user to sign in as.
StorDBPass string // The user's password.
DBDataEncoding string // The encoding used to store object data in strings: <msgpack|json>
RPCJSONListen string // RPC JSON listening address
RPCGOBListen string // RPC GOB listening address
HTTPListen string // HTTP listening address
DefaultReqType string // Use this request type if not defined on top
DefaultCategory string // set default type of record
DefaultTenant string // set default tenant
DefaultSubject string // set default rating subject, useful in case of fallback
RoundingDecimals int // Number of decimals to round end prices at
HttpSkipTlsVerify bool // If enabled Http Client will accept any TLS certificate
XmlCfgDocument *CgrXmlCfgDocument // Load additional configuration inside xml document
RaterEnabled bool // start standalone server (no balancer)
RaterBalancer string // balancer address host:port
BalancerEnabled bool
SchedulerEnabled bool
CDRSEnabled bool // Enable CDR Server service
CDRSExtraFields []*utils.RSRField // Extra fields to store in CDRs
CDRSMediator string // Address where to reach the Mediator. Empty for disabling mediation. <""|internal>
//CdrStats []*cdrstats.CdrStats // Active cdr stats configuration instances
CdreDefaultInstance *CdreConfig // Will be used in the case no specific one selected by API
CdrcEnabled bool // Enable CDR client functionality
CdrcCdrs string // Address where to reach CDR server

View File

@@ -326,6 +326,11 @@ func (ub *Account) refundIncrement(increment *Increment, direction, unitType str
func (ub *Account) executeActionTriggers(a *Action) {
ub.ActionTriggers.Sort()
for _, at := range ub.ActionTriggers {
// sanity check
if !strings.Contains(at.ThresholdType, "counter") &&
strings.Contains(at.ThresholdType, "balance") {
continue
}
if at.Executed {
// trigger is marked as executed, so skipp it until
// the next reset (see RESET_TRIGGERS action type)
@@ -454,7 +459,7 @@ func (ub *Account) initCounters() {
}
}
func (ub *Account) CleanExpiredBalancesAndBuckets() {
func (ub *Account) CleanExpiredBalances() {
for key, bm := range ub.BalanceMap {
for i := 0; i < len(bm); i++ {
if bm[i].IsExpired() {

View File

@@ -891,7 +891,7 @@ func TestCleanExpired(t *testing.T) {
&Balance{ExpirationDate: time.Now().Add(10 * time.Second)},
}},
}
ub.CleanExpiredBalancesAndBuckets()
ub.CleanExpiredBalances()
if len(ub.BalanceMap[CREDIT+OUTBOUND]) != 2 {
t.Error("Error cleaning expired balances!")
}

View File

@@ -22,24 +22,34 @@ import (
"encoding/json"
"fmt"
"sort"
"time"
"github.com/cgrates/cgrates/utils"
)
type ActionTrigger struct {
Id string // uniquely identify the trigger
BalanceType string
Direction string
ThresholdType string //*min_counter, *max_counter, *min_balance, *max_balance
ThresholdValue float64
Recurrent bool // reset eexcuted flag each run
DestinationId string
Weight float64
ActionsId string
Executed bool
Id string // uniquely identify the trigger
BalanceType string
Direction string
ThresholdType string //*min_counter, *max_counter, *min_balance, *max_balance
// stats: *min_asr, *max_asr, *min_acd, *max_acd, *min_acc, *max_acc
ThresholdValue float64
Recurrent bool // reset eexcuted flag each run
MinSleep time.Duration // Minimum duration between two executions in case of recurrent triggers
DestinationId string
Weight float64
ActionsId string
Executed bool
MinQueuedItems int // Trigger actions only if this number is hit (stats only)
lastExecutionTime time.Time
}
func (at *ActionTrigger) Execute(ub *Account) (err error) {
// check for min sleep time
if at.Recurrent && !at.lastExecutionTime.IsZero() && time.Since(at.lastExecutionTime) < at.MinSleep {
return
}
at.lastExecutionTime = time.Now()
if ub.Disabled {
return fmt.Errorf("User %s is disabled", ub.Id)
}

View File

@@ -475,6 +475,7 @@ func (bc BalanceChain) HasBalance(balance *Balance) bool {
func (bc BalanceChain) SaveDirtyBalances(acc *Account) {
for _, b := range bc {
// TODO: check if teh account was not already saved ?
if b.account != nil && b.account != acc && b.dirty {
accountingStorage.SetAccount(b.account)
}

View File

@@ -16,13 +16,11 @@ You should have received a copy of the GNU General Public License
along with this program. If not, see <http://www.gnu.org/licenses/>
*/
package config
package engine
import (
"time"
)
import "time"
type CdrStatsConfig struct {
type CdrStats struct {
Id string // Config id, unique per config instance
QueuedItems int // Number of items in the stats buffer
TimeWindow time.Duration // Will only keep the CDRs who's call setup time is not older than time.Now()-TimeWindow
@@ -41,15 +39,5 @@ type CdrStatsConfig struct {
UsageInterval []time.Duration // 2 or less items (>= Usage, <Usage)
MediationRunIds []string
CostInterval []float64 // 2 or less items, (>=Cost, <Cost)
CdrStatsTriggers []*CdrStatsTrigger
}
type CdrStatsTrigger struct {
ThresholdType string // *min_asr, *max_asr, *min_acd, *max_acd, *min_acc, *max_acc
ThresholdValue float64
MinQueuedItems int // Trigger actions only if this number is hit
MinSleep time.Duration // Minimum duration between two executions in case of recurrent triggers
ActionsId string // Id of actions to be executed
Recurrent bool // Re-enable automatically once executed
Weight float64
Triggers ActionTriggerPriotityList
}

54
engine/stats.go Normal file
View File

@@ -0,0 +1,54 @@
/*
Rating system designed to be used in VoIP Carriers World
Copyright (C) 2013 ITsysCOM
This program is free software: you can redistribute it and/or modify
it under the terms of the GNU General Public License as published by
the Free Software Foundation, either version 3 of the License, or
(at your option) any later version.
This program is distributed in the hope that it will be useful,
but WITHOUT ANY WARRANTY; without even the implied warranty of
MERCHANTABILITY or FITNESS FOR A PARTICULAR PURPOSE. See the
GNU General Public License for more details.
You should have received a copy of the GNU General Public License
along with this program. If not, see <http://www.gnu.org/licenses/>
*/
package engine
import (
"errors"
"sync"
"github.com/cgrates/cgrates/utils"
)
type Stats struct {
queues map[string]*StatsQueue
mux sync.RWMutex
}
func (s *Stats) AddQueue(sq *StatsQueue) {
s.mux.Lock()
defer s.mux.Unlock()
s.queues[sq.conf.Id] = sq
}
func (s *Stats) GetValues(sqID string) (map[string]float64, error) {
s.mux.RLock()
defer s.mux.RUnlock()
if sq, ok := s.queues[sqID]; ok {
return sq.GetStats(), nil
}
return nil, errors.New("Not Found")
}
func (s *Stats) AppendCDR(cdr *utils.StoredCdr) {
s.mux.RLock()
defer s.mux.RUnlock()
for _, sq := range s.queues {
sq.AppendCDR(cdr)
}
}

View File

@@ -16,7 +16,7 @@ You should have received a copy of the GNU General Public License
along with this program. If not, see <http://www.gnu.org/licenses/>
*/
package cdrstats
package engine
import "time"
@@ -26,9 +26,9 @@ type Metric interface {
GetValue() float64
}
const ASR = "ASR"
const ACD = "ACD"
const ACC = "ACC"
const ASR = "asr"
const ACD = "acd"
const ACC = "acc"
func CreateMetric(metric string) Metric {
switch metric {

View File

@@ -16,20 +16,21 @@ You should have received a copy of the GNU General Public License
along with this program. If not, see <http://www.gnu.org/licenses/>
*/
package cdrstats
package engine
import (
"strings"
"sync"
"time"
"github.com/cgrates/cgrates/config"
"github.com/cgrates/cgrates/utils"
)
type StatsQueue struct {
cdrs []*QCDR
conf *config.CdrStatsConfig
conf *CdrStats
metrics map[string]Metric
mux sync.RWMutex
}
// Simplified cdr structure containing only the necessary info
@@ -40,7 +41,7 @@ type QCDR struct {
Cost float64
}
func NewStatsQueue(conf *config.CdrStatsConfig) *StatsQueue {
func NewStatsQueue(conf *CdrStats) *StatsQueue {
if conf == nil {
return &StatsQueue{metrics: make(map[string]Metric)}
}
@@ -58,10 +59,35 @@ func NewStatsQueue(conf *config.CdrStatsConfig) *StatsQueue {
}
func (sq *StatsQueue) AppendCDR(cdr *utils.StoredCdr) {
if sq.AcceptCDR(cdr) {
qcdr := sq.SimplifyCDR(cdr)
sq.mux.Lock()
defer sq.mux.Unlock()
if sq.acceptCDR(cdr) {
qcdr := sq.simplifyCDR(cdr)
sq.cdrs = append(sq.cdrs, qcdr)
sq.addToMetrics(qcdr)
sq.purgeObsoleteCDRs()
// check for trigger
stats := sq.getStats()
sq.conf.Triggers.Sort()
for _, at := range sq.conf.Triggers {
if at.MinQueuedItems > 0 && len(sq.cdrs) < at.MinQueuedItems {
continue
}
if strings.HasPrefix(at.ThresholdType, "*min_") {
if value, ok := stats[at.ThresholdType[len("*min_"):]]; ok {
if value <= at.ThresholdValue {
//at.Execute()
}
}
}
if strings.HasPrefix(at.ThresholdType, "*max_") {
if value, ok := stats[at.ThresholdType[len("*max_"):]]; ok {
if value >= at.ThresholdValue {
//at.Execute()
}
}
}
}
}
}
@@ -77,7 +103,7 @@ func (sq *StatsQueue) removeFromMetrics(cdr *QCDR) {
}
}
func (sq *StatsQueue) SimplifyCDR(cdr *utils.StoredCdr) *QCDR {
func (sq *StatsQueue) simplifyCDR(cdr *utils.StoredCdr) *QCDR {
return &QCDR{
SetupTime: cdr.SetupTime,
AnswerTime: cdr.AnswerTime,
@@ -86,28 +112,38 @@ func (sq *StatsQueue) SimplifyCDR(cdr *utils.StoredCdr) *QCDR {
}
}
func (sq *StatsQueue) PurgeObsoleteCDRs() {
currentLength := len(sq.cdrs)
if currentLength > sq.conf.QueuedItems {
for _, cdr := range sq.cdrs[:currentLength-sq.conf.QueuedItems] {
sq.removeFromMetrics(cdr)
}
sq.cdrs = sq.cdrs[currentLength-sq.conf.QueuedItems:]
}
for i, cdr := range sq.cdrs {
if time.Now().Sub(cdr.SetupTime) > sq.conf.TimeWindow {
sq.removeFromMetrics(cdr)
continue
} else {
if i > 0 {
sq.cdrs = sq.cdrs[i:]
func (sq *StatsQueue) purgeObsoleteCDRs() {
if sq.conf.QueuedItems > 0 {
currentLength := len(sq.cdrs)
if currentLength > sq.conf.QueuedItems {
for _, cdr := range sq.cdrs[:currentLength-sq.conf.QueuedItems] {
sq.removeFromMetrics(cdr)
}
sq.cdrs = sq.cdrs[currentLength-sq.conf.QueuedItems:]
}
}
if sq.conf.TimeWindow > 0 {
for i, cdr := range sq.cdrs {
if time.Now().Sub(cdr.SetupTime) > sq.conf.TimeWindow {
sq.removeFromMetrics(cdr)
continue
} else {
if i > 0 {
sq.cdrs = sq.cdrs[i:]
}
break
}
break
}
}
}
func (sq *StatsQueue) GetStats() map[string]float64 {
sq.mux.RLock()
defer sq.mux.RUnlock()
return sq.getStats()
}
func (sq *StatsQueue) getStats() map[string]float64 {
stat := make(map[string]float64, len(sq.metrics))
for key, metric := range sq.metrics {
stat[key] = metric.GetValue()
@@ -115,7 +151,7 @@ func (sq *StatsQueue) GetStats() map[string]float64 {
return stat
}
func (sq *StatsQueue) AcceptCDR(cdr *utils.StoredCdr) bool {
func (sq *StatsQueue) acceptCDR(cdr *utils.StoredCdr) bool {
if len(sq.conf.SetupInterval) > 0 {
if cdr.SetupTime.Before(sq.conf.SetupInterval[0]) {
return false

View File

@@ -16,25 +16,24 @@ You should have received a copy of the GNU General Public License
along with this program. If not, see <http://www.gnu.org/licenses/>
*/
package cdrstats
package engine
import (
"testing"
"time"
"github.com/cgrates/cgrates/config"
"github.com/cgrates/cgrates/utils"
)
func TestStatsInit(t *testing.T) {
sq := NewStatsQueue(&config.CdrStatsConfig{Metrics: []string{ASR, ACC}})
sq := NewStatsQueue(&CdrStats{Metrics: []string{ASR, ACC}})
if len(sq.metrics) != 2 {
t.Error("Expected 2 metrics got ", len(sq.metrics))
}
}
func TestStatsValue(t *testing.T) {
sq := NewStatsQueue(&config.CdrStatsConfig{Metrics: []string{ASR, ACD, ACC}})
sq := NewStatsQueue(&CdrStats{Metrics: []string{ASR, ACD, ACC}})
cdr := &utils.StoredCdr{
AnswerTime: time.Date(2014, 7, 14, 14, 25, 0, 0, time.UTC),
Usage: 10 * time.Second,
@@ -72,7 +71,7 @@ func TestStatsSimplifyCDR(t *testing.T) {
Cost: 10,
}
sq := &StatsQueue{}
qcdr := sq.SimplifyCDR(cdr)
qcdr := sq.simplifyCDR(cdr)
if cdr.SetupTime != qcdr.SetupTime ||
cdr.AnswerTime != qcdr.AnswerTime ||
cdr.Usage != qcdr.Usage ||
@@ -100,76 +99,76 @@ func TestAcceptCDR(t *testing.T) {
MediationRunId: "mri",
Cost: 10,
}
sq.conf = &config.CdrStatsConfig{}
if sq.AcceptCDR(cdr) != true {
sq.conf = &CdrStats{}
if sq.acceptCDR(cdr) != true {
t.Error("Should have accepted thif CDR: %+v", cdr)
}
sq.conf = &config.CdrStatsConfig{TOR: []string{"test"}}
if sq.AcceptCDR(cdr) == true {
sq.conf = &CdrStats{TOR: []string{"test"}}
if sq.acceptCDR(cdr) == true {
t.Error("Should have NOT accepted thif CDR: %+v", cdr)
}
sq.conf = &config.CdrStatsConfig{CdrHost: []string{"test"}}
if sq.AcceptCDR(cdr) == true {
sq.conf = &CdrStats{CdrHost: []string{"test"}}
if sq.acceptCDR(cdr) == true {
t.Error("Should have NOT accepted thif CDR: %+v", cdr)
}
sq.conf = &config.CdrStatsConfig{CdrSource: []string{"test"}}
if sq.AcceptCDR(cdr) == true {
sq.conf = &CdrStats{CdrSource: []string{"test"}}
if sq.acceptCDR(cdr) == true {
t.Error("Should have NOT accepted thif CDR: %+v", cdr)
}
sq.conf = &config.CdrStatsConfig{Direction: []string{"test"}}
if sq.AcceptCDR(cdr) == true {
sq.conf = &CdrStats{Direction: []string{"test"}}
if sq.acceptCDR(cdr) == true {
t.Error("Should have NOT accepted thif CDR: %+v", cdr)
}
sq.conf = &config.CdrStatsConfig{Tenant: []string{"test"}}
if sq.AcceptCDR(cdr) == true {
sq.conf = &CdrStats{Tenant: []string{"test"}}
if sq.acceptCDR(cdr) == true {
t.Error("Should have NOT accepted thif CDR: %+v", cdr)
}
sq.conf = &config.CdrStatsConfig{Category: []string{"test"}}
if sq.AcceptCDR(cdr) == true {
sq.conf = &CdrStats{Category: []string{"test"}}
if sq.acceptCDR(cdr) == true {
t.Error("Should have NOT accepted thif CDR: %+v", cdr)
}
sq.conf = &config.CdrStatsConfig{Account: []string{"test"}}
if sq.AcceptCDR(cdr) == true {
sq.conf = &CdrStats{Account: []string{"test"}}
if sq.acceptCDR(cdr) == true {
t.Error("Should have NOT accepted thif CDR: %+v", cdr)
}
sq.conf = &config.CdrStatsConfig{Subject: []string{"test"}}
if sq.AcceptCDR(cdr) == true {
sq.conf = &CdrStats{Subject: []string{"test"}}
if sq.acceptCDR(cdr) == true {
t.Error("Should have NOT accepted thif CDR: %+v", cdr)
}
sq.conf = &config.CdrStatsConfig{DestinationPrefix: []string{"test"}}
if sq.AcceptCDR(cdr) == true {
sq.conf = &CdrStats{DestinationPrefix: []string{"test"}}
if sq.acceptCDR(cdr) == true {
t.Error("Should have NOT accepted thif CDR: %+v", cdr)
}
sq.conf = &config.CdrStatsConfig{DestinationPrefix: []string{"test", "123"}}
if sq.AcceptCDR(cdr) != true {
sq.conf = &CdrStats{DestinationPrefix: []string{"test", "123"}}
if sq.acceptCDR(cdr) != true {
t.Error("Should have accepted thif CDR: %+v", cdr)
}
sq.conf = &config.CdrStatsConfig{SetupInterval: []time.Time{time.Date(2014, 7, 3, 13, 43, 0, 1, time.UTC)}}
if sq.AcceptCDR(cdr) == true {
sq.conf = &CdrStats{SetupInterval: []time.Time{time.Date(2014, 7, 3, 13, 43, 0, 1, time.UTC)}}
if sq.acceptCDR(cdr) == true {
t.Error("Should have NOT accepted thif CDR: %+v", cdr)
}
sq.conf = &config.CdrStatsConfig{SetupInterval: []time.Time{time.Date(2014, 7, 3, 13, 42, 0, 0, time.UTC), time.Date(2014, 7, 3, 13, 43, 0, 0, time.UTC)}}
if sq.AcceptCDR(cdr) == true {
sq.conf = &CdrStats{SetupInterval: []time.Time{time.Date(2014, 7, 3, 13, 42, 0, 0, time.UTC), time.Date(2014, 7, 3, 13, 43, 0, 0, time.UTC)}}
if sq.acceptCDR(cdr) == true {
t.Error("Should have NOT accepted thif CDR: %+v", cdr)
}
sq.conf = &config.CdrStatsConfig{SetupInterval: []time.Time{time.Date(2014, 7, 3, 13, 42, 0, 0, time.UTC)}}
if sq.AcceptCDR(cdr) != true {
sq.conf = &CdrStats{SetupInterval: []time.Time{time.Date(2014, 7, 3, 13, 42, 0, 0, time.UTC)}}
if sq.acceptCDR(cdr) != true {
t.Error("Should have accepted thif CDR: %+v", cdr)
}
sq.conf = &config.CdrStatsConfig{SetupInterval: []time.Time{time.Date(2014, 7, 3, 13, 42, 0, 0, time.UTC), time.Date(2014, 7, 3, 13, 43, 0, 1, time.UTC)}}
if sq.AcceptCDR(cdr) != true {
sq.conf = &CdrStats{SetupInterval: []time.Time{time.Date(2014, 7, 3, 13, 42, 0, 0, time.UTC), time.Date(2014, 7, 3, 13, 43, 0, 1, time.UTC)}}
if sq.acceptCDR(cdr) != true {
t.Error("Should have accepted thif CDR: %+v", cdr)
}
sq.conf = &config.CdrStatsConfig{UsageInterval: []time.Duration{11 * time.Second}}
if sq.AcceptCDR(cdr) == true {
sq.conf = &CdrStats{UsageInterval: []time.Duration{11 * time.Second}}
if sq.acceptCDR(cdr) == true {
t.Error("Should have NOT accepted thif CDR: %+v", cdr)
}
sq.conf = &config.CdrStatsConfig{UsageInterval: []time.Duration{1 * time.Second, 10 * time.Second}}
if sq.AcceptCDR(cdr) == true {
sq.conf = &CdrStats{UsageInterval: []time.Duration{1 * time.Second, 10 * time.Second}}
if sq.acceptCDR(cdr) == true {
t.Error("Should have NOT accepted thif CDR: %+v", cdr)
}
sq.conf = &config.CdrStatsConfig{UsageInterval: []time.Duration{10 * time.Second, 11 * time.Second}}
if sq.AcceptCDR(cdr) != true {
sq.conf = &CdrStats{UsageInterval: []time.Duration{10 * time.Second, 11 * time.Second}}
if sq.acceptCDR(cdr) != true {
t.Error("Should have accepted thif CDR: %+v", cdr)
}
}