Engine starting ThresholdS

This commit is contained in:
DanB
2017-10-01 15:42:54 +02:00
parent 3181c0c535
commit 56b4f11033
7 changed files with 74 additions and 19 deletions

View File

@@ -583,6 +583,30 @@ func startStatService(internalStatSChan chan rpcclient.RpcClientConnection, cfg
internalStatSChan <- stsV1
}
// startThresholdService fires up the ThresholdS
func startThresholdService(internalThresholdSChan chan rpcclient.RpcClientConnection, cfg *config.CGRConfig,
dm *engine.DataManager, server *utils.Server, exitChan chan bool) {
tS, err := engine.NewThresholdService(dm, cfg.ThresholdSCfg().FilteredFields,
cfg.ThresholdSCfg().StoreInterval, nil)
if err != nil {
utils.Logger.Crit(fmt.Sprintf("<ThresholdS> Could not init, error: %s", err.Error()))
exitChan <- true
return
}
utils.Logger.Info(fmt.Sprintf("Starting Threshold Service"))
go func() {
if err := tS.ListenAndServe(exitChan); err != nil {
utils.Logger.Crit(fmt.Sprintf("<ThresholdS> Error: %s listening for packets", err.Error()))
}
tS.Shutdown()
exitChan <- true
return
}()
tSv1 := v1.NewThresholdSV1(tS)
server.RpcRegister(tSv1)
internalThresholdSChan <- tSv1
}
func startRpc(server *utils.Server, internalRaterChan,
internalCdrSChan, internalCdrStatSChan, internalHistorySChan, internalPubSubSChan, internalUserSChan,
internalAliaseSChan, internalRsChan, internalStatSChan chan rpcclient.RpcClientConnection, internalSMGChan chan *sessionmanager.SMGeneric) {
@@ -759,6 +783,7 @@ func main() {
internalSMGChan := make(chan *sessionmanager.SMGeneric, 1)
internalRsChan := make(chan rpcclient.RpcClientConnection, 1)
internalStatSChan := make(chan rpcclient.RpcClientConnection, 1)
internalThresholdSChan := make(chan rpcclient.RpcClientConnection, 1)
// Start ServiceManager
srvManager := servmanager.NewServiceManager(cfg, dataDB, exitChan, cacheDoneChan)
@@ -859,6 +884,10 @@ func main() {
go startStatService(internalStatSChan, cfg, dm, server, exitChan)
}
if cfg.ThresholdSCfg().Enabled {
go startThresholdService(internalThresholdSChan, cfg, dm, server, exitChan)
}
// Serve rpc connections
go startRpc(server, internalRaterChan, internalCdrSChan, internalCdrStatSChan, internalHistorySChan,
internalPubSubSChan, internalUserSChan, internalAliaseSChan, internalRsChan, internalStatSChan, internalSMGChan)

View File

@@ -412,20 +412,20 @@ const CGRATES_CFG_JSON = `
"resources": {
"enabled": false, // starts ResourceLimiter service: <true|false>.
"store_interval": "", // dump cache regularly to dataDB, 0 - dump at start/shutdown: <""|*never|$dur>
"store_interval": "", // dump cache regularly to dataDB, 0 - dump at start/shutdown: <""|$dur>
"stats_conns": [], // address where to reach the stats service, empty to disable stats functionality: <""|*internal|x.y.z.y:1234>
},
"stats": {
"enabled": false, // starts Stat service: <true|false>.
"store_interval": "", // dump cache regularly to dataDB, 0 - dump at start/shutdown: <""|*never|$dur>
"store_interval": "", // dump cache regularly to dataDB, 0 - dump at start/shutdown: <""|$dur>
},
"thresholds": {
"enabled": false, // starts ThresholdS service: <true|false>.
"store_interval": "", // dump cache regularly to dataDB, 0 - dump at start/shutdown: <""|*never|$dur>
"store_interval": "", // dump cache regularly to dataDB, 0 - dump at start/shutdown: <""|$dur>
"filtered_fields": [], // match filters based on these fields for dynamic filtering, empty to use all
},

View File

@@ -125,6 +125,12 @@
},
"thresholds": {
"enabled": true,
"store_interval": "1s",
},
"historys": {
"enabled": true,
},

View File

@@ -117,6 +117,11 @@
"store_interval": "1s",
},
"thresholds": {
"enabled": true,
"store_interval": "1s",
},
"historys": {
"enabled": true,

View File

@@ -82,6 +82,13 @@
},
"thresholds": {
"enabled": true,
"store_interval": "1s",
},
"sm_generic": {
"enabled": true,
},

View File

@@ -2091,10 +2091,9 @@ func testOnStorITCRUDThresholdProfile(t *testing.T) {
func testOnStorITCRUDThreshold(t *testing.T) {
res := &Threshold{
Tenant: "cgrates.org",
ID: "TH1",
LastExecuted: time.Date(2016, 10, 1, 0, 0, 0, 0, time.UTC).Local(),
WakeupTime: time.Date(2016, 10, 1, 0, 0, 0, 0, time.UTC).Local(),
Tenant: "cgrates.org",
ID: "TH1",
Snooze: time.Date(2016, 10, 1, 0, 0, 0, 0, time.UTC).Local(),
}
if _, rcvErr := onStor.GetThreshold("cgrates.org", "TH1", true, utils.NonTransactional); rcvErr != nil && rcvErr != utils.ErrNotFound {
t.Error(rcvErr)

View File

@@ -121,23 +121,23 @@ func (ts Thresholds) Sort() {
}
func NewThresholdService(dm *DataManager, filteredFields []string, storeInterval time.Duration,
statS rpcclient.RpcClientConnection) (tS *ThresholdService, err error) {
statS *rpcclient.RpcClientPool) (tS *ThresholdService, err error) {
return &ThresholdService{dm: dm,
filterFields: filterFields,
storeInterval: storeInterval,
statS: statS,
stopBackup: make(chan struct{})}, nil
filteredFields: filteredFields,
storeInterval: storeInterval,
statS: statS,
stopBackup: make(chan struct{})}, nil
}
// ThresholdService manages Threshold execution and storing them to dataDB
type ThresholdService struct {
dm *DataManager
filterFields []string // fields considered when searching for matching thresholds
storeInterval time.Duration
statS rpcclient.RpcClientConnection // allows applying filters based on stats
stopBackup chan struct{}
storedTdIDs utils.StringMap // keep a record of stats which need saving, map[statsTenantID]bool
stMux sync.RWMutex // protects storedTdIDs
dm *DataManager
filteredFields []string // fields considered when searching for matching thresholds
storeInterval time.Duration
statS *rpcclient.RpcClientPool // allows applying filters based on stats
stopBackup chan struct{}
storedTdIDs utils.StringMap // keep a record of stats which need saving, map[statsTenantID]bool
stMux sync.RWMutex // protects storedTdIDs
}
// Called to start the service
@@ -148,6 +148,15 @@ func (tS *ThresholdService) ListenAndServe(exitChan chan bool) error {
return nil
}
// Shutdown is called to shutdown the service
func (tS *ThresholdService) Shutdown() error {
utils.Logger.Info("<ThresholdS> shutdown initialized")
close(tS.stopBackup)
tS.storeThresholds()
utils.Logger.Info("<ThresholdS> shutdown complete")
return nil
}
// backup will regularly store resources changed to dataDB
func (tS *ThresholdService) runBackup() {
if tS.storeInterval <= 0 {