diff --git a/apier/v1/thresholds_it_test.go b/apier/v1/thresholds_it_test.go index 8481f96ca..ea21de60e 100644 --- a/apier/v1/thresholds_it_test.go +++ b/apier/v1/thresholds_it_test.go @@ -45,69 +45,63 @@ var tEvs = []*engine.ThresholdEvent{ &engine.ThresholdEvent{ // hitting THD_ACNT_BALANCE_1 Tenant: "cgrates.org", ID: "event1", - Fields: map[string]interface{}{ + Event: map[string]interface{}{ utils.EventType: utils.AccountUpdate, - utils.EventSource: utils.AccountService, utils.ACCOUNT: "1002", utils.AllowNegative: true, utils.Disabled: false}}, &engine.ThresholdEvent{ // hitting THD_ACNT_BALANCE_1 Tenant: "cgrates.org", ID: "event2", - Fields: map[string]interface{}{ - utils.EventType: utils.BalanceUpdate, - utils.EventSource: utils.AccountService, - utils.ACCOUNT: "1002", - utils.BalanceID: utils.META_DEFAULT, - utils.Units: 12.3, - utils.ExpiryTime: "2009-11-10T23:00:00Z"}}, + Event: map[string]interface{}{ + utils.EventType: utils.BalanceUpdate, + utils.ACCOUNT: "1002", + utils.BalanceID: utils.META_DEFAULT, + utils.Units: 12.3, + utils.ExpiryTime: "2009-11-10T23:00:00Z"}}, &engine.ThresholdEvent{ // hitting THD_STATS_1 Tenant: "cgrates.org", ID: "event3", - Fields: map[string]interface{}{ - utils.EventType: utils.StatUpdate, - utils.EventSource: utils.StatService, - utils.StatID: "Stats1", - utils.ACCOUNT: "1002", - "ASR": 35.0, - "ACD": "2m45s", - "TCC": 12.7, - "TCD": "12m15s", - "ACC": 0.75, - "PDD": "2s", + Event: map[string]interface{}{ + utils.EventType: utils.StatUpdate, + utils.StatID: "Stats1", + utils.ACCOUNT: "1002", + "ASR": 35.0, + "ACD": "2m45s", + "TCC": 12.7, + "TCD": "12m15s", + "ACC": 0.75, + "PDD": "2s", }}, &engine.ThresholdEvent{ // hitting THD_STATS_1 and THD_STATS_2 Tenant: "cgrates.org", ID: "event4", - Fields: map[string]interface{}{ - utils.EventType: utils.StatUpdate, - utils.EventSource: utils.StatService, - utils.StatID: "STATS_HOURLY_DE", - utils.ACCOUNT: "1002", - "ASR": 35.0, - "ACD": "2m45s", - "TCD": "1h", + Event: map[string]interface{}{ + utils.EventType: utils.StatUpdate, + utils.StatID: "STATS_HOURLY_DE", + utils.ACCOUNT: "1002", + "ASR": 35.0, + "ACD": "2m45s", + "TCD": "1h", }}, &engine.ThresholdEvent{ // hitting THD_STATS_3 Tenant: "cgrates.org", ID: "event5", - Fields: map[string]interface{}{ - utils.EventType: utils.StatUpdate, - utils.EventSource: utils.StatService, - utils.StatID: "STATS_DAILY_DE", - utils.ACCOUNT: "1002", - "ACD": "2m45s", - "TCD": "3h1s", + Event: map[string]interface{}{ + utils.EventType: utils.StatUpdate, + utils.StatID: "STATS_DAILY_DE", + utils.ACCOUNT: "1002", + "ACD": "2m45s", + "TCD": "3h1s", }}, &engine.ThresholdEvent{ // hitting THD_RES_1 Tenant: "cgrates.org", ID: "event6", - Fields: map[string]interface{}{ - utils.EventType: utils.ResourceUpdate, - utils.EventSource: utils.ResourceS, - utils.ACCOUNT: "1002", - utils.ResourceID: "RES_GRP_1", - utils.USAGE: 10.0}}, + Event: map[string]interface{}{ + utils.EventType: utils.ResourceUpdate, + utils.ACCOUNT: "1002", + utils.ResourceID: "RES_GRP_1", + utils.USAGE: 10.0}}, } var sTestsThresholdSV1 = []func(t *testing.T){ diff --git a/cmd/cgr-engine/cgr-engine.go b/cmd/cgr-engine/cgr-engine.go index ff3c40776..a02f14013 100644 --- a/cmd/cgr-engine/cgr-engine.go +++ b/cmd/cgr-engine/cgr-engine.go @@ -561,9 +561,19 @@ func startResourceService(internalRsChan, internalStatSConn chan rpcclient.RpcCl } // startStatService fires up the StatS -func startStatService(internalStatSChan chan rpcclient.RpcClientConnection, cfg *config.CGRConfig, +func startStatService(internalStatSChan, internalThresholdSChan chan rpcclient.RpcClientConnection, cfg *config.CGRConfig, dm *engine.DataManager, server *utils.Server, exitChan chan bool) { - sS, err := engine.NewStatService(dm, cfg.StatSCfg().StoreInterval) + var thdSConn *rpcclient.RpcClientPool + if len(cfg.StatSCfg().ThresholdSConns) != 0 { // Stats connection init + thdSConn, err = engine.NewRPCPool(rpcclient.POOL_FIRST, cfg.ConnectAttempts, cfg.Reconnects, cfg.ConnectTimeout, cfg.ReplyTimeout, + cfg.StatSCfg().ThresholdSConns, internalThresholdSChan, cfg.InternalTtl) + if err != nil { + utils.Logger.Crit(fmt.Sprintf(" Could not connect to ThresholdS: %s", err.Error())) + exitChan <- true + return + } + } + sS, err := engine.NewStatService(dm, cfg.StatSCfg().StoreInterval, thdSConn) if err != nil { utils.Logger.Crit(fmt.Sprintf(" Could not init, error: %s", err.Error())) exitChan <- true @@ -882,7 +892,7 @@ func main() { } if cfg.StatSCfg().Enabled { - go startStatService(internalStatSChan, cfg, dm, server, exitChan) + go startStatService(internalStatSChan, internalThresholdSChan, cfg, dm, server, exitChan) } if cfg.ThresholdSCfg().Enabled { diff --git a/data/conf/samples/tutmongo/cgrates.json b/data/conf/samples/tutmongo/cgrates.json index 54e2ea9d2..0a15be732 100644 --- a/data/conf/samples/tutmongo/cgrates.json +++ b/data/conf/samples/tutmongo/cgrates.json @@ -125,6 +125,9 @@ "stats": { "enabled": true, "store_interval": "1s", + "thresholds_conns": [ + {"address": "*internal"} + ], }, diff --git a/data/conf/samples/tutmysql/cgrates.json b/data/conf/samples/tutmysql/cgrates.json index 39eb37ae1..05d3b26aa 100644 --- a/data/conf/samples/tutmysql/cgrates.json +++ b/data/conf/samples/tutmysql/cgrates.json @@ -118,6 +118,9 @@ "stats": { "enabled": true, "store_interval": "1s", + "thresholds_conns": [ + {"address": "*internal"} + ], }, "thresholds": { diff --git a/data/conf/samples/tutpostgres/cgrates.json b/data/conf/samples/tutpostgres/cgrates.json index 146170ec2..5155b969d 100644 --- a/data/conf/samples/tutpostgres/cgrates.json +++ b/data/conf/samples/tutpostgres/cgrates.json @@ -82,6 +82,9 @@ "stats": { "enabled": true, "store_interval": "1s", + "thresholds_conns": [ + {"address": "*internal"} + ], }, diff --git a/data/tariffplans/tutorial/Thresholds.csv b/data/tariffplans/tutorial/Thresholds.csv index 5fd122255..9cfbe904c 100644 --- a/data/tariffplans/tutorial/Thresholds.csv +++ b/data/tariffplans/tutorial/Thresholds.csv @@ -4,15 +4,15 @@ cgrates.org,THD_ACNT_BALANCE_1,*string,EventType,BalanceUpdate,,,,,,, cgrates.org,THD_ACNT_BALANCE_1,*gte,Units,10.0,,,,,,, cgrates.org,THD_ACNT_EXPIRED,*string,Account,1001;1002,2014-07-29T15:00:00Z,true,1s,false,10,LOG_WARNING, cgrates.org,THD_ACNT_EXPIRED,*gte,ExpiryTime,*now,,,,,,, -cgrates.org,THD_STATS_1,*string,EventSource,StatS,2014-07-29T15:00:00Z,true,1s,false,10,LOG_WARNING, +cgrates.org,THD_STATS_1,*string,EventType,StatUpdate,2014-07-29T15:00:00Z,true,1s,false,10,LOG_WARNING, cgrates.org,THD_STATS_1,*lt,ASR,40.0,,,,,,, cgrates.org,THD_STATS_1,*lt,ACD,3m,,,,,,, -cgrates.org,THD_STATS_2,*string,EventSource,StatS,2014-07-29T15:00:00Z,true,1s,false,10,DISABLE_AND_LOG, +cgrates.org,THD_STATS_2,*string,EventType,StatUpdate,2014-07-29T15:00:00Z,true,1s,false,10,DISABLE_AND_LOG, cgrates.org,THD_STATS_2,*string,StatID,STATS_HOURLY_DE,,,,,,, cgrates.org,THD_STATS_2,*gt,TCD,30m,,,,,,, -cgrates.org,THD_STATS_3,*string,EventSource,StatS,2014-07-29T15:00:00Z,false,1s,false,10,TOPUP_100SMS_DE_MOBILE, +cgrates.org,THD_STATS_3,*string,EventType,StatUpdate,2014-07-29T15:00:00Z,false,1s,false,10,TOPUP_100SMS_DE_MOBILE, cgrates.org,THD_STATS_3,*string,StatID,STATS_DAILY_DE,,,,,,, cgrates.org,THD_STATS_3,*gt,TCD,3h,,,,,,, -cgrates.org,THD_RES_1,*string,EventSource,ResourceS,2014-07-29T15:00:00Z,true,1s,false,10,LOG_WARNING, +cgrates.org,THD_RES_1,*string,EventType,ResourceUpdate,2014-07-29T15:00:00Z,true,1s,false,10,LOG_WARNING, cgrates.org,THD_RES_1,*string,ResourceID,RES_GRP_1,,,,,,, cgrates.org,THD_RES_1,*gte,Usage,10.0,,,,,,, diff --git a/engine/balances.go b/engine/balances.go index 08e13e252..e752c4863 100644 --- a/engine/balances.go +++ b/engine/balances.go @@ -782,18 +782,19 @@ func (bc Balances) SaveDirtyBalances(acc *Account) { ev := &ThresholdEvent{ Tenant: acntTnt.Tenant, ID: utils.GenUUID(), - Fields: map[string]interface{}{ + Event: map[string]interface{}{ utils.EventType: utils.BalanceUpdate, utils.EventSource: utils.AccountService, utils.ACCOUNT: acntTnt.ID, utils.BalanceID: b.ID, utils.Units: b.Value}} if !b.ExpirationDate.IsZero() { - ev.Fields[utils.ExpiryTime] = b.ExpirationDate.Format(time.RFC3339) + ev.Event[utils.ExpiryTime] = b.ExpirationDate.Format(time.RFC3339) } var hits int - if err := thresholdS.Call("ThresholdSV1.ProcessEvent", ev, &hits); err != nil { - utils.Logger.Warning(fmt.Sprintf(" error: %s processing balance event %+v with thresholds.", err.Error(), ev)) + if err := thresholdS.Call(utils.ThresholdSv1ProcessEvent, ev, &hits); err != nil { + utils.Logger.Warning( + fmt.Sprintf(" error: %s processing balance event %+v with ThresholdS.", err.Error(), ev)) } } //utils.LogStack() @@ -829,7 +830,7 @@ func (bc Balances) SaveDirtyBalances(acc *Account) { ev := &ThresholdEvent{ Tenant: acntTnt.Tenant, ID: utils.GenUUID(), - Fields: map[string]interface{}{ + Event: map[string]interface{}{ utils.EventType: utils.AccountUpdate, utils.EventSource: utils.AccountService, utils.ACCOUNT: acntTnt.ID, @@ -838,7 +839,7 @@ func (bc Balances) SaveDirtyBalances(acc *Account) { var hits int if err := thresholdS.Call("ThresholdSV1.ProcessEvent", ev, &hits); err != nil { utils.Logger.Warning( - fmt.Sprintf(" error: %s processing account event %+v with thresholds.", err.Error(), ev)) + fmt.Sprintf(" error: %s processing account event %+v with ThresholdS.", err.Error(), ev)) } } diff --git a/engine/cdrs.go b/engine/cdrs.go index dc0cfa010..e83c0064c 100644 --- a/engine/cdrs.go +++ b/engine/cdrs.go @@ -71,22 +71,22 @@ func fsCdrHandler(w http.ResponseWriter, r *http.Request) { func NewCdrServer(cgrCfg *config.CGRConfig, cdrDb CdrStorage, dm *DataManager, rater, pubsub, users, aliases, cdrstats, stats rpcclient.RpcClientConnection) (*CdrServer, error) { - if rater == nil || reflect.ValueOf(rater).IsNil() { // Work around so we store actual nil instead of nil interface value, faster to check here than in CdrServer code + if rater != nil && reflect.ValueOf(rater).IsNil() { // Work around so we store actual nil instead of nil interface value, faster to check here than in CdrServer code rater = nil } - if pubsub == nil || reflect.ValueOf(pubsub).IsNil() { + if pubsub != nil && reflect.ValueOf(pubsub).IsNil() { pubsub = nil } - if users == nil || reflect.ValueOf(users).IsNil() { + if users != nil && reflect.ValueOf(users).IsNil() { users = nil } - if aliases == nil || reflect.ValueOf(aliases).IsNil() { + if aliases != nil && reflect.ValueOf(aliases).IsNil() { aliases = nil } - if cdrstats == nil || reflect.ValueOf(cdrstats).IsNil() { + if cdrstats != nil && reflect.ValueOf(cdrstats).IsNil() { cdrstats = nil } - if stats == nil || reflect.ValueOf(stats).IsNil() { + if stats != nil && reflect.ValueOf(stats).IsNil() { stats = nil } return &CdrServer{cgrCfg: cgrCfg, cdrDb: cdrDb, dm: dm, diff --git a/engine/stats.go b/engine/stats.go index 51a6455bd..a66063c19 100644 --- a/engine/stats.go +++ b/engine/stats.go @@ -21,6 +21,7 @@ package engine import ( "fmt" "math/rand" + "reflect" "sync" "time" @@ -28,12 +29,19 @@ import ( "github.com/cgrates/cgrates/config" "github.com/cgrates/cgrates/guardian" "github.com/cgrates/cgrates/utils" + "github.com/cgrates/rpcclient" ) // NewStatService initializes a StatService -func NewStatService(dm *DataManager, storeInterval time.Duration) (ss *StatService, err error) { - return &StatService{dm: dm, +func NewStatService(dm *DataManager, storeInterval time.Duration, + thdS rpcclient.RpcClientConnection) (ss *StatService, err error) { + if thdS != nil && reflect.ValueOf(thdS).IsNil() { // fix nil value in interface + thdS = nil + } + return &StatService{ + dm: dm, storeInterval: storeInterval, + thdS: thdS, storedStatQueues: make(utils.StringMap), stopBackup: make(chan struct{})}, nil } @@ -42,6 +50,7 @@ func NewStatService(dm *DataManager, storeInterval time.Duration) (ss *StatServi type StatService struct { dm *DataManager storeInterval time.Duration + thdS rpcclient.RpcClientConnection // rpc connection towards ThresholdS stopBackup chan struct{} storedStatQueues utils.StringMap // keep a record of stats which need saving, map[statsTenantID]bool ssqMux sync.RWMutex // protects storedStatQueues @@ -209,7 +218,7 @@ func (sS *StatService) processEvent(ev *StatEvent) (err error) { for _, sq := range matchSQs { if err = sq.ProcessEvent(ev); err != nil { utils.Logger.Warning( - fmt.Sprintf(" Queue: %s, ignoring event: %s, error: %s", + fmt.Sprintf(" Queue: %s, ignoring event: %s, error: %s", sq.TenantID(), ev.TenantID(), err.Error())) withErrors = true } @@ -224,6 +233,23 @@ func (sS *StatService) processEvent(ev *StatEvent) (err error) { sS.storedStatQueues[sq.TenantID()] = true sS.ssqMux.Unlock() } + if sS.thdS != nil { + ev := &ThresholdEvent{ + Tenant: sq.Tenant, + ID: utils.GenUUID(), + Event: map[string]interface{}{ + utils.EventType: utils.StatUpdate, + utils.StatID: sq.ID}} + for metricID, metric := range sq.SQMetrics { + ev.Event[metricID] = metric.GetValue() + } + var hits int + if err := thresholdS.Call(utils.ThresholdSv1ProcessEvent, ev, &hits); err != nil { + utils.Logger.Warning( + fmt.Sprintf(" error: %s processing event %+v with ThresholdS.", err.Error(), ev)) + withErrors = true + } + } } if withErrors { err = utils.ErrPartiallyExecuted diff --git a/engine/thresholds.go b/engine/thresholds.go index 27bcbd3e6..61d7aad8f 100644 --- a/engine/thresholds.go +++ b/engine/thresholds.go @@ -54,7 +54,7 @@ func (tp *ThresholdProfile) TenantID() string { type ThresholdEvent struct { Tenant string ID string - Fields map[string]interface{} + Event map[string]interface{} } func (te *ThresholdEvent) TenantID() string { @@ -62,7 +62,7 @@ func (te *ThresholdEvent) TenantID() string { } func (te *ThresholdEvent) Account() (acnt string, err error) { - acntIf, has := te.Fields[utils.ACCOUNT] + acntIf, has := te.Event[utils.ACCOUNT] if !has { return "", utils.ErrNotFound } @@ -77,14 +77,14 @@ func (te *ThresholdEvent) FilterableEvent(fltredFields []string) (fEv map[string fEv = make(map[string]interface{}) if len(fltredFields) == 0 { i := 0 - fltredFields = make([]string, len(te.Fields)) - for k := range te.Fields { + fltredFields = make([]string, len(te.Event)) + for k := range te.Event { fltredFields[i] = k i++ } } for _, fltrFld := range fltredFields { - fldVal, has := te.Fields[fltrFld] + fldVal, has := te.Event[fltrFld] if !has { continue // the field does not exist in map, ignore it } @@ -250,7 +250,7 @@ func (tS *ThresholdService) StoreThreshold(t *Threshold) (err error) { // matchingThresholdsForEvent returns ordered list of matching thresholds which are active for an Event func (tS *ThresholdService) matchingThresholdsForEvent(ev *ThresholdEvent) (ts Thresholds, err error) { matchingTs := make(map[string]*Threshold) - tIDs, err := matchingItemIDsForEvent(ev.Fields, tS.dm, utils.ThresholdsIndex+ev.Tenant) + tIDs, err := matchingItemIDsForEvent(ev.Event, tS.dm, utils.ThresholdsIndex+ev.Tenant) if err != nil { return nil, err } diff --git a/utils/consts.go b/utils/consts.go index 2276ac14a..214959d67 100755 --- a/utils/consts.go +++ b/utils/consts.go @@ -482,6 +482,7 @@ const ( AllowNegative = "AllowNegative" Disabled = "Disabled" Action = "Action" + ThresholdSv1ProcessEvent = "ThresholdSv1.ProcessEvent" ) func buildCacheInstRevPrefixes() {