StatS with ThresholdS, ThresholdEvent.Fields -> ThresholdEvent.Event

This commit is contained in:
DanB
2017-10-22 13:31:17 +02:00
parent 26fbde9639
commit 6980e177d1
11 changed files with 110 additions and 69 deletions

View File

@@ -45,69 +45,63 @@ var tEvs = []*engine.ThresholdEvent{
&engine.ThresholdEvent{ // hitting THD_ACNT_BALANCE_1
Tenant: "cgrates.org",
ID: "event1",
Fields: map[string]interface{}{
Event: map[string]interface{}{
utils.EventType: utils.AccountUpdate,
utils.EventSource: utils.AccountService,
utils.ACCOUNT: "1002",
utils.AllowNegative: true,
utils.Disabled: false}},
&engine.ThresholdEvent{ // hitting THD_ACNT_BALANCE_1
Tenant: "cgrates.org",
ID: "event2",
Fields: map[string]interface{}{
utils.EventType: utils.BalanceUpdate,
utils.EventSource: utils.AccountService,
utils.ACCOUNT: "1002",
utils.BalanceID: utils.META_DEFAULT,
utils.Units: 12.3,
utils.ExpiryTime: "2009-11-10T23:00:00Z"}},
Event: map[string]interface{}{
utils.EventType: utils.BalanceUpdate,
utils.ACCOUNT: "1002",
utils.BalanceID: utils.META_DEFAULT,
utils.Units: 12.3,
utils.ExpiryTime: "2009-11-10T23:00:00Z"}},
&engine.ThresholdEvent{ // hitting THD_STATS_1
Tenant: "cgrates.org",
ID: "event3",
Fields: map[string]interface{}{
utils.EventType: utils.StatUpdate,
utils.EventSource: utils.StatService,
utils.StatID: "Stats1",
utils.ACCOUNT: "1002",
"ASR": 35.0,
"ACD": "2m45s",
"TCC": 12.7,
"TCD": "12m15s",
"ACC": 0.75,
"PDD": "2s",
Event: map[string]interface{}{
utils.EventType: utils.StatUpdate,
utils.StatID: "Stats1",
utils.ACCOUNT: "1002",
"ASR": 35.0,
"ACD": "2m45s",
"TCC": 12.7,
"TCD": "12m15s",
"ACC": 0.75,
"PDD": "2s",
}},
&engine.ThresholdEvent{ // hitting THD_STATS_1 and THD_STATS_2
Tenant: "cgrates.org",
ID: "event4",
Fields: map[string]interface{}{
utils.EventType: utils.StatUpdate,
utils.EventSource: utils.StatService,
utils.StatID: "STATS_HOURLY_DE",
utils.ACCOUNT: "1002",
"ASR": 35.0,
"ACD": "2m45s",
"TCD": "1h",
Event: map[string]interface{}{
utils.EventType: utils.StatUpdate,
utils.StatID: "STATS_HOURLY_DE",
utils.ACCOUNT: "1002",
"ASR": 35.0,
"ACD": "2m45s",
"TCD": "1h",
}},
&engine.ThresholdEvent{ // hitting THD_STATS_3
Tenant: "cgrates.org",
ID: "event5",
Fields: map[string]interface{}{
utils.EventType: utils.StatUpdate,
utils.EventSource: utils.StatService,
utils.StatID: "STATS_DAILY_DE",
utils.ACCOUNT: "1002",
"ACD": "2m45s",
"TCD": "3h1s",
Event: map[string]interface{}{
utils.EventType: utils.StatUpdate,
utils.StatID: "STATS_DAILY_DE",
utils.ACCOUNT: "1002",
"ACD": "2m45s",
"TCD": "3h1s",
}},
&engine.ThresholdEvent{ // hitting THD_RES_1
Tenant: "cgrates.org",
ID: "event6",
Fields: map[string]interface{}{
utils.EventType: utils.ResourceUpdate,
utils.EventSource: utils.ResourceS,
utils.ACCOUNT: "1002",
utils.ResourceID: "RES_GRP_1",
utils.USAGE: 10.0}},
Event: map[string]interface{}{
utils.EventType: utils.ResourceUpdate,
utils.ACCOUNT: "1002",
utils.ResourceID: "RES_GRP_1",
utils.USAGE: 10.0}},
}
var sTestsThresholdSV1 = []func(t *testing.T){

View File

@@ -561,9 +561,19 @@ func startResourceService(internalRsChan, internalStatSConn chan rpcclient.RpcCl
}
// startStatService fires up the StatS
func startStatService(internalStatSChan chan rpcclient.RpcClientConnection, cfg *config.CGRConfig,
func startStatService(internalStatSChan, internalThresholdSChan chan rpcclient.RpcClientConnection, cfg *config.CGRConfig,
dm *engine.DataManager, server *utils.Server, exitChan chan bool) {
sS, err := engine.NewStatService(dm, cfg.StatSCfg().StoreInterval)
var thdSConn *rpcclient.RpcClientPool
if len(cfg.StatSCfg().ThresholdSConns) != 0 { // Stats connection init
thdSConn, err = engine.NewRPCPool(rpcclient.POOL_FIRST, cfg.ConnectAttempts, cfg.Reconnects, cfg.ConnectTimeout, cfg.ReplyTimeout,
cfg.StatSCfg().ThresholdSConns, internalThresholdSChan, cfg.InternalTtl)
if err != nil {
utils.Logger.Crit(fmt.Sprintf("<StatS> Could not connect to ThresholdS: %s", err.Error()))
exitChan <- true
return
}
}
sS, err := engine.NewStatService(dm, cfg.StatSCfg().StoreInterval, thdSConn)
if err != nil {
utils.Logger.Crit(fmt.Sprintf("<StatS> Could not init, error: %s", err.Error()))
exitChan <- true
@@ -882,7 +892,7 @@ func main() {
}
if cfg.StatSCfg().Enabled {
go startStatService(internalStatSChan, cfg, dm, server, exitChan)
go startStatService(internalStatSChan, internalThresholdSChan, cfg, dm, server, exitChan)
}
if cfg.ThresholdSCfg().Enabled {

View File

@@ -125,6 +125,9 @@
"stats": {
"enabled": true,
"store_interval": "1s",
"thresholds_conns": [
{"address": "*internal"}
],
},

View File

@@ -118,6 +118,9 @@
"stats": {
"enabled": true,
"store_interval": "1s",
"thresholds_conns": [
{"address": "*internal"}
],
},
"thresholds": {

View File

@@ -82,6 +82,9 @@
"stats": {
"enabled": true,
"store_interval": "1s",
"thresholds_conns": [
{"address": "*internal"}
],
},

View File

@@ -4,15 +4,15 @@ cgrates.org,THD_ACNT_BALANCE_1,*string,EventType,BalanceUpdate,,,,,,,
cgrates.org,THD_ACNT_BALANCE_1,*gte,Units,10.0,,,,,,,
cgrates.org,THD_ACNT_EXPIRED,*string,Account,1001;1002,2014-07-29T15:00:00Z,true,1s,false,10,LOG_WARNING,
cgrates.org,THD_ACNT_EXPIRED,*gte,ExpiryTime,*now,,,,,,,
cgrates.org,THD_STATS_1,*string,EventSource,StatS,2014-07-29T15:00:00Z,true,1s,false,10,LOG_WARNING,
cgrates.org,THD_STATS_1,*string,EventType,StatUpdate,2014-07-29T15:00:00Z,true,1s,false,10,LOG_WARNING,
cgrates.org,THD_STATS_1,*lt,ASR,40.0,,,,,,,
cgrates.org,THD_STATS_1,*lt,ACD,3m,,,,,,,
cgrates.org,THD_STATS_2,*string,EventSource,StatS,2014-07-29T15:00:00Z,true,1s,false,10,DISABLE_AND_LOG,
cgrates.org,THD_STATS_2,*string,EventType,StatUpdate,2014-07-29T15:00:00Z,true,1s,false,10,DISABLE_AND_LOG,
cgrates.org,THD_STATS_2,*string,StatID,STATS_HOURLY_DE,,,,,,,
cgrates.org,THD_STATS_2,*gt,TCD,30m,,,,,,,
cgrates.org,THD_STATS_3,*string,EventSource,StatS,2014-07-29T15:00:00Z,false,1s,false,10,TOPUP_100SMS_DE_MOBILE,
cgrates.org,THD_STATS_3,*string,EventType,StatUpdate,2014-07-29T15:00:00Z,false,1s,false,10,TOPUP_100SMS_DE_MOBILE,
cgrates.org,THD_STATS_3,*string,StatID,STATS_DAILY_DE,,,,,,,
cgrates.org,THD_STATS_3,*gt,TCD,3h,,,,,,,
cgrates.org,THD_RES_1,*string,EventSource,ResourceS,2014-07-29T15:00:00Z,true,1s,false,10,LOG_WARNING,
cgrates.org,THD_RES_1,*string,EventType,ResourceUpdate,2014-07-29T15:00:00Z,true,1s,false,10,LOG_WARNING,
cgrates.org,THD_RES_1,*string,ResourceID,RES_GRP_1,,,,,,,
cgrates.org,THD_RES_1,*gte,Usage,10.0,,,,,,,
1 #Tenant[0],Id[1],FilterType[2],FilterFieldName[3],FilterFieldValues[4],ActivationInterval[5],Recurrent[6],MinHits[7],MinSleep[8],Blocker[9],Weight[10],ActionIDs[11],Async[12]
4 cgrates.org,THD_ACNT_BALANCE_1,*gte,Units,10.0,,,,,,,
5 cgrates.org,THD_ACNT_EXPIRED,*string,Account,1001;1002,2014-07-29T15:00:00Z,true,1s,false,10,LOG_WARNING,
6 cgrates.org,THD_ACNT_EXPIRED,*gte,ExpiryTime,*now,,,,,,,
7 cgrates.org,THD_STATS_1,*string,EventSource,StatS,2014-07-29T15:00:00Z,true,1s,false,10,LOG_WARNING, cgrates.org,THD_STATS_1,*string,EventType,StatUpdate,2014-07-29T15:00:00Z,true,1s,false,10,LOG_WARNING,
8 cgrates.org,THD_STATS_1,*lt,ASR,40.0,,,,,,,
9 cgrates.org,THD_STATS_1,*lt,ACD,3m,,,,,,,
10 cgrates.org,THD_STATS_2,*string,EventSource,StatS,2014-07-29T15:00:00Z,true,1s,false,10,DISABLE_AND_LOG, cgrates.org,THD_STATS_2,*string,EventType,StatUpdate,2014-07-29T15:00:00Z,true,1s,false,10,DISABLE_AND_LOG,
11 cgrates.org,THD_STATS_2,*string,StatID,STATS_HOURLY_DE,,,,,,,
12 cgrates.org,THD_STATS_2,*gt,TCD,30m,,,,,,,
13 cgrates.org,THD_STATS_3,*string,EventSource,StatS,2014-07-29T15:00:00Z,false,1s,false,10,TOPUP_100SMS_DE_MOBILE, cgrates.org,THD_STATS_3,*string,EventType,StatUpdate,2014-07-29T15:00:00Z,false,1s,false,10,TOPUP_100SMS_DE_MOBILE,
14 cgrates.org,THD_STATS_3,*string,StatID,STATS_DAILY_DE,,,,,,,
15 cgrates.org,THD_STATS_3,*gt,TCD,3h,,,,,,,
16 cgrates.org,THD_RES_1,*string,EventSource,ResourceS,2014-07-29T15:00:00Z,true,1s,false,10,LOG_WARNING, cgrates.org,THD_RES_1,*string,EventType,ResourceUpdate,2014-07-29T15:00:00Z,true,1s,false,10,LOG_WARNING,
17 cgrates.org,THD_RES_1,*string,ResourceID,RES_GRP_1,,,,,,,
18 cgrates.org,THD_RES_1,*gte,Usage,10.0,,,,,,,

View File

@@ -782,18 +782,19 @@ func (bc Balances) SaveDirtyBalances(acc *Account) {
ev := &ThresholdEvent{
Tenant: acntTnt.Tenant,
ID: utils.GenUUID(),
Fields: map[string]interface{}{
Event: map[string]interface{}{
utils.EventType: utils.BalanceUpdate,
utils.EventSource: utils.AccountService,
utils.ACCOUNT: acntTnt.ID,
utils.BalanceID: b.ID,
utils.Units: b.Value}}
if !b.ExpirationDate.IsZero() {
ev.Fields[utils.ExpiryTime] = b.ExpirationDate.Format(time.RFC3339)
ev.Event[utils.ExpiryTime] = b.ExpirationDate.Format(time.RFC3339)
}
var hits int
if err := thresholdS.Call("ThresholdSV1.ProcessEvent", ev, &hits); err != nil {
utils.Logger.Warning(fmt.Sprintf("<AccountS> error: %s processing balance event %+v with thresholds.", err.Error(), ev))
if err := thresholdS.Call(utils.ThresholdSv1ProcessEvent, ev, &hits); err != nil {
utils.Logger.Warning(
fmt.Sprintf("<AccountS> error: %s processing balance event %+v with ThresholdS.", err.Error(), ev))
}
}
//utils.LogStack()
@@ -829,7 +830,7 @@ func (bc Balances) SaveDirtyBalances(acc *Account) {
ev := &ThresholdEvent{
Tenant: acntTnt.Tenant,
ID: utils.GenUUID(),
Fields: map[string]interface{}{
Event: map[string]interface{}{
utils.EventType: utils.AccountUpdate,
utils.EventSource: utils.AccountService,
utils.ACCOUNT: acntTnt.ID,
@@ -838,7 +839,7 @@ func (bc Balances) SaveDirtyBalances(acc *Account) {
var hits int
if err := thresholdS.Call("ThresholdSV1.ProcessEvent", ev, &hits); err != nil {
utils.Logger.Warning(
fmt.Sprintf("<AccountS> error: %s processing account event %+v with thresholds.", err.Error(), ev))
fmt.Sprintf("<AccountS> error: %s processing account event %+v with ThresholdS.", err.Error(), ev))
}
}

View File

@@ -71,22 +71,22 @@ func fsCdrHandler(w http.ResponseWriter, r *http.Request) {
func NewCdrServer(cgrCfg *config.CGRConfig, cdrDb CdrStorage, dm *DataManager, rater, pubsub, users,
aliases, cdrstats, stats rpcclient.RpcClientConnection) (*CdrServer, error) {
if rater == nil || reflect.ValueOf(rater).IsNil() { // Work around so we store actual nil instead of nil interface value, faster to check here than in CdrServer code
if rater != nil && reflect.ValueOf(rater).IsNil() { // Work around so we store actual nil instead of nil interface value, faster to check here than in CdrServer code
rater = nil
}
if pubsub == nil || reflect.ValueOf(pubsub).IsNil() {
if pubsub != nil && reflect.ValueOf(pubsub).IsNil() {
pubsub = nil
}
if users == nil || reflect.ValueOf(users).IsNil() {
if users != nil && reflect.ValueOf(users).IsNil() {
users = nil
}
if aliases == nil || reflect.ValueOf(aliases).IsNil() {
if aliases != nil && reflect.ValueOf(aliases).IsNil() {
aliases = nil
}
if cdrstats == nil || reflect.ValueOf(cdrstats).IsNil() {
if cdrstats != nil && reflect.ValueOf(cdrstats).IsNil() {
cdrstats = nil
}
if stats == nil || reflect.ValueOf(stats).IsNil() {
if stats != nil && reflect.ValueOf(stats).IsNil() {
stats = nil
}
return &CdrServer{cgrCfg: cgrCfg, cdrDb: cdrDb, dm: dm,

View File

@@ -21,6 +21,7 @@ package engine
import (
"fmt"
"math/rand"
"reflect"
"sync"
"time"
@@ -28,12 +29,19 @@ import (
"github.com/cgrates/cgrates/config"
"github.com/cgrates/cgrates/guardian"
"github.com/cgrates/cgrates/utils"
"github.com/cgrates/rpcclient"
)
// NewStatService initializes a StatService
func NewStatService(dm *DataManager, storeInterval time.Duration) (ss *StatService, err error) {
return &StatService{dm: dm,
func NewStatService(dm *DataManager, storeInterval time.Duration,
thdS rpcclient.RpcClientConnection) (ss *StatService, err error) {
if thdS != nil && reflect.ValueOf(thdS).IsNil() { // fix nil value in interface
thdS = nil
}
return &StatService{
dm: dm,
storeInterval: storeInterval,
thdS: thdS,
storedStatQueues: make(utils.StringMap),
stopBackup: make(chan struct{})}, nil
}
@@ -42,6 +50,7 @@ func NewStatService(dm *DataManager, storeInterval time.Duration) (ss *StatServi
type StatService struct {
dm *DataManager
storeInterval time.Duration
thdS rpcclient.RpcClientConnection // rpc connection towards ThresholdS
stopBackup chan struct{}
storedStatQueues utils.StringMap // keep a record of stats which need saving, map[statsTenantID]bool
ssqMux sync.RWMutex // protects storedStatQueues
@@ -209,7 +218,7 @@ func (sS *StatService) processEvent(ev *StatEvent) (err error) {
for _, sq := range matchSQs {
if err = sq.ProcessEvent(ev); err != nil {
utils.Logger.Warning(
fmt.Sprintf("<StatService> Queue: %s, ignoring event: %s, error: %s",
fmt.Sprintf("<StatS> Queue: %s, ignoring event: %s, error: %s",
sq.TenantID(), ev.TenantID(), err.Error()))
withErrors = true
}
@@ -224,6 +233,23 @@ func (sS *StatService) processEvent(ev *StatEvent) (err error) {
sS.storedStatQueues[sq.TenantID()] = true
sS.ssqMux.Unlock()
}
if sS.thdS != nil {
ev := &ThresholdEvent{
Tenant: sq.Tenant,
ID: utils.GenUUID(),
Event: map[string]interface{}{
utils.EventType: utils.StatUpdate,
utils.StatID: sq.ID}}
for metricID, metric := range sq.SQMetrics {
ev.Event[metricID] = metric.GetValue()
}
var hits int
if err := thresholdS.Call(utils.ThresholdSv1ProcessEvent, ev, &hits); err != nil {
utils.Logger.Warning(
fmt.Sprintf("<StatS> error: %s processing event %+v with ThresholdS.", err.Error(), ev))
withErrors = true
}
}
}
if withErrors {
err = utils.ErrPartiallyExecuted

View File

@@ -54,7 +54,7 @@ func (tp *ThresholdProfile) TenantID() string {
type ThresholdEvent struct {
Tenant string
ID string
Fields map[string]interface{}
Event map[string]interface{}
}
func (te *ThresholdEvent) TenantID() string {
@@ -62,7 +62,7 @@ func (te *ThresholdEvent) TenantID() string {
}
func (te *ThresholdEvent) Account() (acnt string, err error) {
acntIf, has := te.Fields[utils.ACCOUNT]
acntIf, has := te.Event[utils.ACCOUNT]
if !has {
return "", utils.ErrNotFound
}
@@ -77,14 +77,14 @@ func (te *ThresholdEvent) FilterableEvent(fltredFields []string) (fEv map[string
fEv = make(map[string]interface{})
if len(fltredFields) == 0 {
i := 0
fltredFields = make([]string, len(te.Fields))
for k := range te.Fields {
fltredFields = make([]string, len(te.Event))
for k := range te.Event {
fltredFields[i] = k
i++
}
}
for _, fltrFld := range fltredFields {
fldVal, has := te.Fields[fltrFld]
fldVal, has := te.Event[fltrFld]
if !has {
continue // the field does not exist in map, ignore it
}
@@ -250,7 +250,7 @@ func (tS *ThresholdService) StoreThreshold(t *Threshold) (err error) {
// matchingThresholdsForEvent returns ordered list of matching thresholds which are active for an Event
func (tS *ThresholdService) matchingThresholdsForEvent(ev *ThresholdEvent) (ts Thresholds, err error) {
matchingTs := make(map[string]*Threshold)
tIDs, err := matchingItemIDsForEvent(ev.Fields, tS.dm, utils.ThresholdsIndex+ev.Tenant)
tIDs, err := matchingItemIDsForEvent(ev.Event, tS.dm, utils.ThresholdsIndex+ev.Tenant)
if err != nil {
return nil, err
}

View File

@@ -482,6 +482,7 @@ const (
AllowNegative = "AllowNegative"
Disabled = "Disabled"
Action = "Action"
ThresholdSv1ProcessEvent = "ThresholdSv1.ProcessEvent"
)
func buildCacheInstRevPrefixes() {