From 56b4f110339d9b64fa3fac27ef041e10692fa3c0 Mon Sep 17 00:00:00 2001 From: DanB Date: Sun, 1 Oct 2017 15:42:54 +0200 Subject: [PATCH] Engine starting ThresholdS --- cmd/cgr-engine/cgr-engine.go | 29 +++++++++++++++++++ config/config_defaults.go | 6 ++-- data/conf/samples/tutmongo/cgrates.json | 6 ++++ data/conf/samples/tutmysql/cgrates.json | 5 ++++ data/conf/samples/tutpostgres/cgrates.json | 7 +++++ engine/onstor_it_test.go | 7 ++--- engine/thresholds.go | 33 ++++++++++++++-------- 7 files changed, 74 insertions(+), 19 deletions(-) diff --git a/cmd/cgr-engine/cgr-engine.go b/cmd/cgr-engine/cgr-engine.go index 80b555f40..1d7bb5f05 100644 --- a/cmd/cgr-engine/cgr-engine.go +++ b/cmd/cgr-engine/cgr-engine.go @@ -583,6 +583,30 @@ func startStatService(internalStatSChan chan rpcclient.RpcClientConnection, cfg internalStatSChan <- stsV1 } +// startThresholdService fires up the ThresholdS +func startThresholdService(internalThresholdSChan chan rpcclient.RpcClientConnection, cfg *config.CGRConfig, + dm *engine.DataManager, server *utils.Server, exitChan chan bool) { + tS, err := engine.NewThresholdService(dm, cfg.ThresholdSCfg().FilteredFields, + cfg.ThresholdSCfg().StoreInterval, nil) + if err != nil { + utils.Logger.Crit(fmt.Sprintf(" Could not init, error: %s", err.Error())) + exitChan <- true + return + } + utils.Logger.Info(fmt.Sprintf("Starting Threshold Service")) + go func() { + if err := tS.ListenAndServe(exitChan); err != nil { + utils.Logger.Crit(fmt.Sprintf(" Error: %s listening for packets", err.Error())) + } + tS.Shutdown() + exitChan <- true + return + }() + tSv1 := v1.NewThresholdSV1(tS) + server.RpcRegister(tSv1) + internalThresholdSChan <- tSv1 +} + func startRpc(server *utils.Server, internalRaterChan, internalCdrSChan, internalCdrStatSChan, internalHistorySChan, internalPubSubSChan, internalUserSChan, internalAliaseSChan, internalRsChan, internalStatSChan chan rpcclient.RpcClientConnection, internalSMGChan chan *sessionmanager.SMGeneric) { @@ -759,6 +783,7 @@ func main() { internalSMGChan := make(chan *sessionmanager.SMGeneric, 1) internalRsChan := make(chan rpcclient.RpcClientConnection, 1) internalStatSChan := make(chan rpcclient.RpcClientConnection, 1) + internalThresholdSChan := make(chan rpcclient.RpcClientConnection, 1) // Start ServiceManager srvManager := servmanager.NewServiceManager(cfg, dataDB, exitChan, cacheDoneChan) @@ -859,6 +884,10 @@ func main() { go startStatService(internalStatSChan, cfg, dm, server, exitChan) } + if cfg.ThresholdSCfg().Enabled { + go startThresholdService(internalThresholdSChan, cfg, dm, server, exitChan) + } + // Serve rpc connections go startRpc(server, internalRaterChan, internalCdrSChan, internalCdrStatSChan, internalHistorySChan, internalPubSubSChan, internalUserSChan, internalAliaseSChan, internalRsChan, internalStatSChan, internalSMGChan) diff --git a/config/config_defaults.go b/config/config_defaults.go index 5eed6a527..5e33955cd 100755 --- a/config/config_defaults.go +++ b/config/config_defaults.go @@ -412,20 +412,20 @@ const CGRATES_CFG_JSON = ` "resources": { "enabled": false, // starts ResourceLimiter service: . - "store_interval": "", // dump cache regularly to dataDB, 0 - dump at start/shutdown: <""|*never|$dur> + "store_interval": "", // dump cache regularly to dataDB, 0 - dump at start/shutdown: <""|$dur> "stats_conns": [], // address where to reach the stats service, empty to disable stats functionality: <""|*internal|x.y.z.y:1234> }, "stats": { "enabled": false, // starts Stat service: . - "store_interval": "", // dump cache regularly to dataDB, 0 - dump at start/shutdown: <""|*never|$dur> + "store_interval": "", // dump cache regularly to dataDB, 0 - dump at start/shutdown: <""|$dur> }, "thresholds": { "enabled": false, // starts ThresholdS service: . - "store_interval": "", // dump cache regularly to dataDB, 0 - dump at start/shutdown: <""|*never|$dur> + "store_interval": "", // dump cache regularly to dataDB, 0 - dump at start/shutdown: <""|$dur> "filtered_fields": [], // match filters based on these fields for dynamic filtering, empty to use all }, diff --git a/data/conf/samples/tutmongo/cgrates.json b/data/conf/samples/tutmongo/cgrates.json index 50800bd45..d0e3d32d0 100644 --- a/data/conf/samples/tutmongo/cgrates.json +++ b/data/conf/samples/tutmongo/cgrates.json @@ -125,6 +125,12 @@ }, +"thresholds": { + "enabled": true, + "store_interval": "1s", +}, + + "historys": { "enabled": true, }, diff --git a/data/conf/samples/tutmysql/cgrates.json b/data/conf/samples/tutmysql/cgrates.json index a2414e4e1..43c94761e 100644 --- a/data/conf/samples/tutmysql/cgrates.json +++ b/data/conf/samples/tutmysql/cgrates.json @@ -117,6 +117,11 @@ "store_interval": "1s", }, +"thresholds": { + "enabled": true, + "store_interval": "1s", +}, + "historys": { "enabled": true, diff --git a/data/conf/samples/tutpostgres/cgrates.json b/data/conf/samples/tutpostgres/cgrates.json index 634b3af0b..7fd54ef10 100644 --- a/data/conf/samples/tutpostgres/cgrates.json +++ b/data/conf/samples/tutpostgres/cgrates.json @@ -82,6 +82,13 @@ }, + +"thresholds": { + "enabled": true, + "store_interval": "1s", +}, + + "sm_generic": { "enabled": true, }, diff --git a/engine/onstor_it_test.go b/engine/onstor_it_test.go index 1703fab4d..125011fcb 100644 --- a/engine/onstor_it_test.go +++ b/engine/onstor_it_test.go @@ -2091,10 +2091,9 @@ func testOnStorITCRUDThresholdProfile(t *testing.T) { func testOnStorITCRUDThreshold(t *testing.T) { res := &Threshold{ - Tenant: "cgrates.org", - ID: "TH1", - LastExecuted: time.Date(2016, 10, 1, 0, 0, 0, 0, time.UTC).Local(), - WakeupTime: time.Date(2016, 10, 1, 0, 0, 0, 0, time.UTC).Local(), + Tenant: "cgrates.org", + ID: "TH1", + Snooze: time.Date(2016, 10, 1, 0, 0, 0, 0, time.UTC).Local(), } if _, rcvErr := onStor.GetThreshold("cgrates.org", "TH1", true, utils.NonTransactional); rcvErr != nil && rcvErr != utils.ErrNotFound { t.Error(rcvErr) diff --git a/engine/thresholds.go b/engine/thresholds.go index 7461a5f73..b21a67ff1 100644 --- a/engine/thresholds.go +++ b/engine/thresholds.go @@ -121,23 +121,23 @@ func (ts Thresholds) Sort() { } func NewThresholdService(dm *DataManager, filteredFields []string, storeInterval time.Duration, - statS rpcclient.RpcClientConnection) (tS *ThresholdService, err error) { + statS *rpcclient.RpcClientPool) (tS *ThresholdService, err error) { return &ThresholdService{dm: dm, - filterFields: filterFields, - storeInterval: storeInterval, - statS: statS, - stopBackup: make(chan struct{})}, nil + filteredFields: filteredFields, + storeInterval: storeInterval, + statS: statS, + stopBackup: make(chan struct{})}, nil } // ThresholdService manages Threshold execution and storing them to dataDB type ThresholdService struct { - dm *DataManager - filterFields []string // fields considered when searching for matching thresholds - storeInterval time.Duration - statS rpcclient.RpcClientConnection // allows applying filters based on stats - stopBackup chan struct{} - storedTdIDs utils.StringMap // keep a record of stats which need saving, map[statsTenantID]bool - stMux sync.RWMutex // protects storedTdIDs + dm *DataManager + filteredFields []string // fields considered when searching for matching thresholds + storeInterval time.Duration + statS *rpcclient.RpcClientPool // allows applying filters based on stats + stopBackup chan struct{} + storedTdIDs utils.StringMap // keep a record of stats which need saving, map[statsTenantID]bool + stMux sync.RWMutex // protects storedTdIDs } // Called to start the service @@ -148,6 +148,15 @@ func (tS *ThresholdService) ListenAndServe(exitChan chan bool) error { return nil } +// Shutdown is called to shutdown the service +func (tS *ThresholdService) Shutdown() error { + utils.Logger.Info(" shutdown initialized") + close(tS.stopBackup) + tS.storeThresholds() + utils.Logger.Info(" shutdown complete") + return nil +} + // backup will regularly store resources changed to dataDB func (tS *ThresholdService) runBackup() { if tS.storeInterval <= 0 {