Added ThresholdS as service in ServiceManager

This commit is contained in:
Trial97
2019-09-16 14:01:16 +03:00
committed by Dan Christian Bogos
parent 4ce5caea76
commit 8c826e1e33
8 changed files with 296 additions and 65 deletions

View File

@@ -904,38 +904,6 @@ func startStatService(internalStatSChan, internalThresholdSChan,
internalStatSChan <- stsV1
}
// startThresholdService fires up the ThresholdS
func startThresholdService(internalThresholdSChan chan rpcclient.RpcClientConnection,
cacheS *engine.CacheS, cfg *config.CGRConfig, dm *engine.DataManager,
server *utils.Server, filterSChan chan *engine.FilterS, exitChan chan bool) {
filterS := <-filterSChan
filterSChan <- filterS
<-cacheS.GetPrecacheChannel(utils.CacheThresholdProfiles)
<-cacheS.GetPrecacheChannel(utils.CacheThresholds)
<-cacheS.GetPrecacheChannel(utils.CacheThresholdFilterIndexes)
tS, err := engine.NewThresholdService(dm, cfg.ThresholdSCfg().StringIndexedFields,
cfg.ThresholdSCfg().PrefixIndexedFields, cfg.ThresholdSCfg().StoreInterval, filterS)
if err != nil {
utils.Logger.Crit(fmt.Sprintf("<ThresholdS> Could not init, error: %s", err.Error()))
exitChan <- true
return
}
go func() {
if err := tS.ListenAndServe(exitChan); err != nil {
utils.Logger.Crit(fmt.Sprintf("<ThresholdS> Error: %s listening for packets", err.Error()))
}
tS.Shutdown()
exitChan <- true
return
}()
tSv1 := v1.NewThresholdSv1(tS)
if !cfg.DispatcherSCfg().Enabled {
server.RpcRegister(tSv1)
}
internalThresholdSChan <- tSv1
}
// startSupplierService fires up the SupplierS
func startSupplierService(internalSupplierSChan, internalRsChan, internalStatSChan,
internalAttrSChan, internalDispatcherSChan chan rpcclient.RpcClientConnection,
@@ -1606,9 +1574,11 @@ func main() {
loadDb, filterSChan, server, internalDispatcherSChan, exitChan)
attrS := services.NewAttributeService()
chrS := services.NewChargerService()
srvManager.AddService(attrS, chrS)
tS := services.NewThresholdService()
srvManager.AddService(attrS, chrS, tS)
internalAttributeSChan = attrS.GetIntenternalChan()
internalChargerSChan = chrS.GetIntenternalChan()
internalThresholdSChan = tS.GetIntenternalChan()
go srvManager.StartServices()
initServiceManagerV1(internalServeManagerChan, srvManager, server)
@@ -1731,11 +1701,6 @@ func main() {
filterSChan, exitChan)
}
if cfg.ThresholdSCfg().Enabled {
go startThresholdService(internalThresholdSChan, cacheS,
cfg, dm, server, filterSChan, exitChan)
}
if cfg.SupplierSCfg().Enabled {
go startSupplierService(internalSupplierSChan, internalRsChan,
internalStatSChan, internalAttributeSChan, internalDispatcherSChan,

View File

@@ -1172,7 +1172,10 @@ func (cfg *CGRConfig) StatSCfg() *StatSCfg {
return cfg.statsCfg
}
// ThresholdSCfg returns the config for ThresholdS
func (cfg *CGRConfig) ThresholdSCfg() *ThresholdSCfg {
cfg.lks[THRESHOLDS_JSON].Lock()
defer cfg.lks[THRESHOLDS_JSON].Unlock()
return cfg.thresholdSCfg
}
@@ -1405,6 +1408,7 @@ func (cfg *CGRConfig) unlockSections() {
}
}
// V1ReloadConfig reloads the configuration
func (cfg *CGRConfig) V1ReloadConfig(args *ConfigReloadWithArgDispatcher, reply *string) (err error) {
if missing := utils.MissingStructFields(args, []string{"Path"}); len(missing) != 0 {
return utils.NewErrMandatoryIeMissing(missing...)
@@ -1573,6 +1577,7 @@ func (cfg *CGRConfig) reloadSection(section string) (err error) {
}
fallthrough
case THRESHOLDS_JSON:
cfg.rldChans[THRESHOLDS_JSON] <- struct{}{}
if !fall {
break
}

View File

@@ -138,6 +138,31 @@ func TestCGRConfigReloadChargerS(t *testing.T) {
}
}
func TestCGRConfigReloadThresholdS(t *testing.T) {
cfg, err := NewDefaultCGRConfig()
if err != nil {
t.Fatal(err)
}
var reply string
if err = cfg.V1ReloadConfig(&ConfigReloadWithArgDispatcher{
Path: path.Join("/usr", "share", "cgrates", "conf", "samples", "tutmongo2"),
Section: THRESHOLDS_JSON,
}, &reply); err != nil {
t.Error(err)
} else if reply != utils.OK {
t.Errorf("Expected OK received: %s", reply)
}
expAttr := &ThresholdSCfg{
Enabled: true,
StringIndexedFields: &[]string{utils.Account},
PrefixIndexedFields: &[]string{},
IndexedSelects: true,
}
if !reflect.DeepEqual(expAttr, cfg.ThresholdSCfg()) {
t.Errorf("Expected %s , received: %s ", utils.ToJSON(expAttr), utils.ToJSON(cfg.ThresholdSCfg()))
}
}
func TestCgrCfgV1ReloadConfigSection(t *testing.T) {
for _, dir := range []string{"/tmp/ers/in", "/tmp/ers/out"} {
if err := os.RemoveAll(dir); err != nil {

View File

@@ -114,27 +114,24 @@ func (ts Thresholds) Sort() {
sort.Slice(ts, func(i, j int) bool { return ts[i].tPrfl.Weight > ts[j].tPrfl.Weight })
}
func NewThresholdService(dm *DataManager, stringIndexedFields, prefixIndexedFields *[]string, storeInterval time.Duration,
filterS *FilterS) (tS *ThresholdService, err error) {
func NewThresholdService(dm *DataManager, cgrcfg *config.CGRConfig, filterS *FilterS) (tS *ThresholdService, err error) {
return &ThresholdService{dm: dm,
stringIndexedFields: stringIndexedFields,
prefixIndexedFields: prefixIndexedFields,
storeInterval: storeInterval,
filterS: filterS,
stopBackup: make(chan struct{}),
storedTdIDs: make(utils.StringMap)}, nil
cgrcfg: cgrcfg,
filterS: filterS,
stopBackup: make(chan struct{}),
loopStoped: make(chan struct{}),
storedTdIDs: make(utils.StringMap)}, nil
}
// ThresholdService manages Threshold execution and storing them to dataDB
type ThresholdService struct {
dm *DataManager
stringIndexedFields *[]string // fields considered when searching for matching thresholds
prefixIndexedFields *[]string
storeInterval time.Duration
filterS *FilterS
stopBackup chan struct{}
storedTdIDs utils.StringMap // keep a record of stats which need saving, map[statsTenantID]bool
stMux sync.RWMutex // protects storedTdIDs
dm *DataManager
cgrcfg *config.CGRConfig
filterS *FilterS
stopBackup chan struct{}
loopStoped chan struct{}
storedTdIDs utils.StringMap // keep a record of stats which need saving, map[statsTenantID]bool
stMux sync.RWMutex // protects storedTdIDs
}
// Called to start the service
@@ -157,17 +154,19 @@ func (tS *ThresholdService) Shutdown() error {
// backup will regularly store resources changed to dataDB
func (tS *ThresholdService) runBackup() {
if tS.storeInterval <= 0 {
storeInterval := tS.cgrcfg.ThresholdSCfg().StoreInterval
if storeInterval <= 0 {
tS.loopStoped <- struct{}{}
return
}
for {
tS.storeThresholds()
select {
case <-tS.stopBackup:
tS.loopStoped <- struct{}{}
return
default:
case <-time.After(storeInterval):
}
tS.storeThresholds()
time.Sleep(tS.storeInterval)
}
}
@@ -224,8 +223,8 @@ func (tS *ThresholdService) matchingThresholdsForEvent(args *ArgsProcessEvent) (
if len(args.ThresholdIDs) != 0 {
tIDs = args.ThresholdIDs
} else {
tIDsMap, err := MatchingItemIDsForEvent(args.Event, tS.stringIndexedFields,
tS.prefixIndexedFields, tS.dm, utils.CacheThresholdFilterIndexes,
tIDsMap, err := MatchingItemIDsForEvent(args.Event, tS.cgrcfg.ThresholdSCfg().StringIndexedFields,
tS.cgrcfg.ThresholdSCfg().PrefixIndexedFields, tS.dm, utils.CacheThresholdFilterIndexes,
args.Tenant, tS.filterS.cfg.ThresholdSCfg().IndexedSelects)
if err != nil {
return nil, err
@@ -320,7 +319,7 @@ func (tS *ThresholdService) processEvent(args *ArgsProcessEvent) (thresholdsIDs
}
t.Snooze = time.Now().Add(t.tPrfl.MinSleep)
// recurrent threshold
if tS.storeInterval == -1 {
if tS.cgrcfg.ThresholdSCfg().StoreInterval == -1 {
tS.StoreThreshold(t)
} else {
*t.dirty = true // mark it to be saved
@@ -398,3 +397,16 @@ func (tS *ThresholdService) V1GetThreshold(tntID *utils.TenantID, t *Threshold)
}
return
}
// Reload stops the backupLoop and restarts it
func (tS *ThresholdService) Reload() {
close(tS.stopBackup)
<-tS.loopStoped // wait until the loop is done
tS.stopBackup = make(chan struct{})
go tS.runBackup()
}
// StartLoop starts the gorutine with the backup loop
func (tS *ThresholdService) StartLoop() {
go tS.runBackup()
}

View File

@@ -148,9 +148,10 @@ func TestThresholdsPopulateThresholdService(t *testing.T) {
if err != nil {
t.Errorf("Error: %+v", err)
}
thServ, err = NewThresholdService(dmTH, nil, nil, 0,
&FilterS{dm: dmTH, cfg: defaultCfg})
defaultCfg.ThresholdSCfg().StoreInterval = 0
defaultCfg.ThresholdSCfg().StringIndexedFields = nil
defaultCfg.ThresholdSCfg().PrefixIndexedFields = nil
thServ, err = NewThresholdService(dmTH, defaultCfg, &FilterS{dm: dmTH, cfg: defaultCfg})
if err != nil {
t.Errorf("Error: %+v", err)
}

106
services/thresholds.go Normal file
View File

@@ -0,0 +1,106 @@
/*
Real-time Online/Offline Charging System (OCS) for Telecom & ISP environments
Copyright (C) ITsysCOM GmbH
This program is free software: you can redistribute it and/or modify
it under the terms of the GNU General Public License as published by
the Free Software Foundation, either version 3 of the License, or
(at your option) any later version.
This program is distributed in the hope that it will be useful,
but WITHOUT ANY WARRANTY; without even the implied warranty of
MERCHANTABILITY or FITNESS FOR A PARTICULAR PURPOSE. See the
GNU General Public License for more details.
You should have received a copy of the GNU General Public License
along with this program. If not, see <http://www.gnu.org/licenses/>
*/
package services
import (
"fmt"
v1 "github.com/cgrates/cgrates/apier/v1"
"github.com/cgrates/cgrates/engine"
"github.com/cgrates/cgrates/servmanager"
"github.com/cgrates/cgrates/utils"
"github.com/cgrates/rpcclient"
)
// NewThresholdService returns the Threshold Service
func NewThresholdService() servmanager.Service {
return &ThresholdService{
connChan: make(chan rpcclient.RpcClientConnection, 1),
}
}
// ThresholdService implements Service interface
type ThresholdService struct {
thrs *engine.ThresholdService
rpc *v1.ThresholdSv1
connChan chan rpcclient.RpcClientConnection
}
// Start should handle the sercive start
func (thrs *ThresholdService) Start(sp servmanager.ServiceProvider, waitCache bool) (err error) {
if thrs.IsRunning() {
return fmt.Errorf("service aleady running")
}
if waitCache {
<-sp.GetCacheS().GetPrecacheChannel(utils.CacheThresholdProfiles)
<-sp.GetCacheS().GetPrecacheChannel(utils.CacheThresholds)
<-sp.GetCacheS().GetPrecacheChannel(utils.CacheThresholdFilterIndexes)
}
thrs.thrs, err = engine.NewThresholdService(sp.GetDM(), sp.GetConfig(), sp.GetFilterS())
if err != nil {
utils.Logger.Crit(fmt.Sprintf("<%s> Could not init, error: %s", utils.ThresholdS, err.Error()))
return
}
thrs.thrs.StartLoop()
thrs.rpc = v1.NewThresholdSv1(thrs.thrs)
if !sp.GetConfig().DispatcherSCfg().Enabled {
sp.GetServer().RpcRegister(thrs.rpc)
}
thrs.connChan <- thrs.rpc
return
}
// GetIntenternalChan returns the internal connection chanel
func (thrs *ThresholdService) GetIntenternalChan() (conn chan rpcclient.RpcClientConnection) {
return thrs.connChan
}
// Reload handles the change of config
func (thrs *ThresholdService) Reload(sp servmanager.ServiceProvider) (err error) {
thrs.thrs.Reload()
return
}
// Shutdown stops the service
func (thrs *ThresholdService) Shutdown() (err error) {
if err = thrs.thrs.Shutdown(); err != nil {
return
}
thrs.thrs = nil
thrs.rpc = nil
<-thrs.connChan
return
}
// GetRPCInterface returns the interface to register for server
func (thrs *ThresholdService) GetRPCInterface() interface{} {
return thrs.rpc
}
// IsRunning returns if the service is running
func (thrs *ThresholdService) IsRunning() bool {
return thrs != nil && thrs.thrs != nil
}
// ServiceName returns the service name
func (thrs *ThresholdService) ServiceName() string {
return utils.ThresholdS
}

View File

@@ -0,0 +1,79 @@
// +build integration
/*
Real-time Online/Offline Charging System (OCS) for Telecom & ISP environments
Copyright (C) ITsysCOM GmbH
This program is free software: you can redistribute it and/or modify
it under the terms of the GNU General Public License as published by
the Free Software Foundation, either version 3 of the License, or
(at your option) any later version.
This program is distributed in the hope that it will be useful,
but WITHOUT ANY WARRANTY; without even the implied warranty of
MERCHANTABILITY or FITNESS FOR A PARTICULAR PURPOSE. See the
GNU General Public License for more details.
You should have received a copy of the GNU General Public License
along with this program. If not, see <http://www.gnu.org/licenses/>
*/
package services
import (
"path"
"testing"
"time"
"github.com/cgrates/cgrates/config"
"github.com/cgrates/cgrates/engine"
"github.com/cgrates/cgrates/servmanager"
"github.com/cgrates/cgrates/utils"
)
func TestThresholdSReload(t *testing.T) {
// utils.Logger.SetLogLevel(7)
cfg, err := config.NewDefaultCGRConfig()
if err != nil {
t.Fatal(err)
}
filterSChan := make(chan *engine.FilterS, 1)
filterSChan <- nil
engineShutdown := make(chan bool, 1)
chS := engine.NewCacheS(cfg, nil)
close(chS.GetPrecacheChannel(utils.CacheThresholdProfiles))
close(chS.GetPrecacheChannel(utils.CacheThresholds))
close(chS.GetPrecacheChannel(utils.CacheThresholdFilterIndexes))
server := utils.NewServer()
srvMngr := servmanager.NewServiceManager(cfg /*dm*/, nil,
chS /*cdrStorage*/, nil,
/*loadStorage*/ nil, filterSChan,
server, nil, engineShutdown)
tS := NewThresholdService()
srvMngr.AddService(tS)
if err = srvMngr.StartServices(); err != nil {
t.Error(err)
}
if tS.IsRunning() {
t.Errorf("Expected service to be down")
}
var reply string
if err = cfg.V1ReloadConfig(&config.ConfigReloadWithArgDispatcher{
Path: path.Join("/usr", "share", "cgrates", "conf", "samples", "tutmongo"),
Section: config.THRESHOLDS_JSON,
}, &reply); err != nil {
t.Error(err)
} else if reply != utils.OK {
t.Errorf("Expecting OK ,received %s", reply)
}
time.Sleep(10 * time.Millisecond) //need to switch to gorutine
if !tS.IsRunning() {
t.Errorf("Expected service to be running")
}
cfg.ThresholdSCfg().Enabled = false
cfg.GetReloadChan(config.THRESHOLDS_JSON) <- struct{}{}
time.Sleep(10 * time.Millisecond)
if tS.IsRunning() {
t.Errorf("Expected service to be down")
}
engineShutdown <- true
}

View File

@@ -306,7 +306,17 @@ func (srvMngr *ServiceManager) StartServices() (err error) {
}
}()
}
if srvMngr.cfg.ThresholdSCfg().Enabled {
go func() {
if chrS, has := srvMngr.subsystems[utils.ThresholdS]; !has {
utils.Logger.Err(fmt.Sprintf("<%s> Failed to start <%s>", utils.ServiceManager, utils.ChargerS))
srvMngr.engineShutdown <- true
} else if err = chrS.Start(srvMngr, true); err != nil {
utils.Logger.Err(fmt.Sprintf("<%s> Failed to start %s because: %s", utils.ServiceManager, utils.ChargerS, err))
srvMngr.engineShutdown <- true
}
}()
}
// startServer()
return
}
@@ -384,6 +394,34 @@ func (srvMngr *ServiceManager) handleReload() {
return // stop if we encounter an error
}
}
case <-srvMngr.cfg.GetReloadChan(config.THRESHOLDS_JSON):
tS, has := srvMngr.subsystems[utils.ThresholdS]
if !has {
utils.Logger.Err(fmt.Sprintf("<%s> Failed to start <%s>", utils.ServiceManager, utils.ThresholdS))
srvMngr.engineShutdown <- true
return // stop if we encounter an error
}
if srvMngr.cfg.ThresholdSCfg().Enabled {
if tS.IsRunning() {
if err = tS.Reload(srvMngr); err != nil {
utils.Logger.Err(fmt.Sprintf("<%s> Failed to reload <%s>", utils.ServiceManager, utils.ThresholdS))
srvMngr.engineShutdown <- true
return // stop if we encounter an error
}
} else {
if err = tS.Start(srvMngr, true); err != nil {
utils.Logger.Err(fmt.Sprintf("<%s> Failed to start <%s>", utils.ServiceManager, utils.ThresholdS))
srvMngr.engineShutdown <- true
return // stop if we encounter an error
}
}
} else if tS.IsRunning() {
if err = tS.Shutdown(); err != nil {
utils.Logger.Err(fmt.Sprintf("<%s> Failed to stop service <%s>", utils.ServiceManager, utils.ThresholdS))
srvMngr.engineShutdown <- true
return // stop if we encounter an error
}
}
}
// handle RPC server
}