diff --git a/cmd/cgr-engine/cgr-engine.go b/cmd/cgr-engine/cgr-engine.go index 08f8302a2..a11eabc8e 100644 --- a/cmd/cgr-engine/cgr-engine.go +++ b/cmd/cgr-engine/cgr-engine.go @@ -904,38 +904,6 @@ func startStatService(internalStatSChan, internalThresholdSChan, internalStatSChan <- stsV1 } -// startThresholdService fires up the ThresholdS -func startThresholdService(internalThresholdSChan chan rpcclient.RpcClientConnection, - cacheS *engine.CacheS, cfg *config.CGRConfig, dm *engine.DataManager, - server *utils.Server, filterSChan chan *engine.FilterS, exitChan chan bool) { - filterS := <-filterSChan - filterSChan <- filterS - <-cacheS.GetPrecacheChannel(utils.CacheThresholdProfiles) - <-cacheS.GetPrecacheChannel(utils.CacheThresholds) - <-cacheS.GetPrecacheChannel(utils.CacheThresholdFilterIndexes) - - tS, err := engine.NewThresholdService(dm, cfg.ThresholdSCfg().StringIndexedFields, - cfg.ThresholdSCfg().PrefixIndexedFields, cfg.ThresholdSCfg().StoreInterval, filterS) - if err != nil { - utils.Logger.Crit(fmt.Sprintf(" Could not init, error: %s", err.Error())) - exitChan <- true - return - } - go func() { - if err := tS.ListenAndServe(exitChan); err != nil { - utils.Logger.Crit(fmt.Sprintf(" Error: %s listening for packets", err.Error())) - } - tS.Shutdown() - exitChan <- true - return - }() - tSv1 := v1.NewThresholdSv1(tS) - if !cfg.DispatcherSCfg().Enabled { - server.RpcRegister(tSv1) - } - internalThresholdSChan <- tSv1 -} - // startSupplierService fires up the SupplierS func startSupplierService(internalSupplierSChan, internalRsChan, internalStatSChan, internalAttrSChan, internalDispatcherSChan chan rpcclient.RpcClientConnection, @@ -1606,9 +1574,11 @@ func main() { loadDb, filterSChan, server, internalDispatcherSChan, exitChan) attrS := services.NewAttributeService() chrS := services.NewChargerService() - srvManager.AddService(attrS, chrS) + tS := services.NewThresholdService() + srvManager.AddService(attrS, chrS, tS) internalAttributeSChan = attrS.GetIntenternalChan() internalChargerSChan = chrS.GetIntenternalChan() + internalThresholdSChan = tS.GetIntenternalChan() go srvManager.StartServices() initServiceManagerV1(internalServeManagerChan, srvManager, server) @@ -1731,11 +1701,6 @@ func main() { filterSChan, exitChan) } - if cfg.ThresholdSCfg().Enabled { - go startThresholdService(internalThresholdSChan, cacheS, - cfg, dm, server, filterSChan, exitChan) - } - if cfg.SupplierSCfg().Enabled { go startSupplierService(internalSupplierSChan, internalRsChan, internalStatSChan, internalAttributeSChan, internalDispatcherSChan, diff --git a/config/config.go b/config/config.go index 31fe4ce12..c804cc323 100755 --- a/config/config.go +++ b/config/config.go @@ -1172,7 +1172,10 @@ func (cfg *CGRConfig) StatSCfg() *StatSCfg { return cfg.statsCfg } +// ThresholdSCfg returns the config for ThresholdS func (cfg *CGRConfig) ThresholdSCfg() *ThresholdSCfg { + cfg.lks[THRESHOLDS_JSON].Lock() + defer cfg.lks[THRESHOLDS_JSON].Unlock() return cfg.thresholdSCfg } @@ -1405,6 +1408,7 @@ func (cfg *CGRConfig) unlockSections() { } } +// V1ReloadConfig reloads the configuration func (cfg *CGRConfig) V1ReloadConfig(args *ConfigReloadWithArgDispatcher, reply *string) (err error) { if missing := utils.MissingStructFields(args, []string{"Path"}); len(missing) != 0 { return utils.NewErrMandatoryIeMissing(missing...) @@ -1573,6 +1577,7 @@ func (cfg *CGRConfig) reloadSection(section string) (err error) { } fallthrough case THRESHOLDS_JSON: + cfg.rldChans[THRESHOLDS_JSON] <- struct{}{} if !fall { break } diff --git a/config/config_it_test.go b/config/config_it_test.go index c22eb35ec..f7b976ade 100644 --- a/config/config_it_test.go +++ b/config/config_it_test.go @@ -138,6 +138,31 @@ func TestCGRConfigReloadChargerS(t *testing.T) { } } +func TestCGRConfigReloadThresholdS(t *testing.T) { + cfg, err := NewDefaultCGRConfig() + if err != nil { + t.Fatal(err) + } + var reply string + if err = cfg.V1ReloadConfig(&ConfigReloadWithArgDispatcher{ + Path: path.Join("/usr", "share", "cgrates", "conf", "samples", "tutmongo2"), + Section: THRESHOLDS_JSON, + }, &reply); err != nil { + t.Error(err) + } else if reply != utils.OK { + t.Errorf("Expected OK received: %s", reply) + } + expAttr := &ThresholdSCfg{ + Enabled: true, + StringIndexedFields: &[]string{utils.Account}, + PrefixIndexedFields: &[]string{}, + IndexedSelects: true, + } + if !reflect.DeepEqual(expAttr, cfg.ThresholdSCfg()) { + t.Errorf("Expected %s , received: %s ", utils.ToJSON(expAttr), utils.ToJSON(cfg.ThresholdSCfg())) + } +} + func TestCgrCfgV1ReloadConfigSection(t *testing.T) { for _, dir := range []string{"/tmp/ers/in", "/tmp/ers/out"} { if err := os.RemoveAll(dir); err != nil { diff --git a/engine/thresholds.go b/engine/thresholds.go index 3ff720cde..7e04b25b6 100644 --- a/engine/thresholds.go +++ b/engine/thresholds.go @@ -114,27 +114,24 @@ func (ts Thresholds) Sort() { sort.Slice(ts, func(i, j int) bool { return ts[i].tPrfl.Weight > ts[j].tPrfl.Weight }) } -func NewThresholdService(dm *DataManager, stringIndexedFields, prefixIndexedFields *[]string, storeInterval time.Duration, - filterS *FilterS) (tS *ThresholdService, err error) { +func NewThresholdService(dm *DataManager, cgrcfg *config.CGRConfig, filterS *FilterS) (tS *ThresholdService, err error) { return &ThresholdService{dm: dm, - stringIndexedFields: stringIndexedFields, - prefixIndexedFields: prefixIndexedFields, - storeInterval: storeInterval, - filterS: filterS, - stopBackup: make(chan struct{}), - storedTdIDs: make(utils.StringMap)}, nil + cgrcfg: cgrcfg, + filterS: filterS, + stopBackup: make(chan struct{}), + loopStoped: make(chan struct{}), + storedTdIDs: make(utils.StringMap)}, nil } // ThresholdService manages Threshold execution and storing them to dataDB type ThresholdService struct { - dm *DataManager - stringIndexedFields *[]string // fields considered when searching for matching thresholds - prefixIndexedFields *[]string - storeInterval time.Duration - filterS *FilterS - stopBackup chan struct{} - storedTdIDs utils.StringMap // keep a record of stats which need saving, map[statsTenantID]bool - stMux sync.RWMutex // protects storedTdIDs + dm *DataManager + cgrcfg *config.CGRConfig + filterS *FilterS + stopBackup chan struct{} + loopStoped chan struct{} + storedTdIDs utils.StringMap // keep a record of stats which need saving, map[statsTenantID]bool + stMux sync.RWMutex // protects storedTdIDs } // Called to start the service @@ -157,17 +154,19 @@ func (tS *ThresholdService) Shutdown() error { // backup will regularly store resources changed to dataDB func (tS *ThresholdService) runBackup() { - if tS.storeInterval <= 0 { + storeInterval := tS.cgrcfg.ThresholdSCfg().StoreInterval + if storeInterval <= 0 { + tS.loopStoped <- struct{}{} return } for { + tS.storeThresholds() select { case <-tS.stopBackup: + tS.loopStoped <- struct{}{} return - default: + case <-time.After(storeInterval): } - tS.storeThresholds() - time.Sleep(tS.storeInterval) } } @@ -224,8 +223,8 @@ func (tS *ThresholdService) matchingThresholdsForEvent(args *ArgsProcessEvent) ( if len(args.ThresholdIDs) != 0 { tIDs = args.ThresholdIDs } else { - tIDsMap, err := MatchingItemIDsForEvent(args.Event, tS.stringIndexedFields, - tS.prefixIndexedFields, tS.dm, utils.CacheThresholdFilterIndexes, + tIDsMap, err := MatchingItemIDsForEvent(args.Event, tS.cgrcfg.ThresholdSCfg().StringIndexedFields, + tS.cgrcfg.ThresholdSCfg().PrefixIndexedFields, tS.dm, utils.CacheThresholdFilterIndexes, args.Tenant, tS.filterS.cfg.ThresholdSCfg().IndexedSelects) if err != nil { return nil, err @@ -320,7 +319,7 @@ func (tS *ThresholdService) processEvent(args *ArgsProcessEvent) (thresholdsIDs } t.Snooze = time.Now().Add(t.tPrfl.MinSleep) // recurrent threshold - if tS.storeInterval == -1 { + if tS.cgrcfg.ThresholdSCfg().StoreInterval == -1 { tS.StoreThreshold(t) } else { *t.dirty = true // mark it to be saved @@ -398,3 +397,16 @@ func (tS *ThresholdService) V1GetThreshold(tntID *utils.TenantID, t *Threshold) } return } + +// Reload stops the backupLoop and restarts it +func (tS *ThresholdService) Reload() { + close(tS.stopBackup) + <-tS.loopStoped // wait until the loop is done + tS.stopBackup = make(chan struct{}) + go tS.runBackup() +} + +// StartLoop starts the gorutine with the backup loop +func (tS *ThresholdService) StartLoop() { + go tS.runBackup() +} diff --git a/engine/thresholds_test.go b/engine/thresholds_test.go index adc6c3937..e927e3a51 100644 --- a/engine/thresholds_test.go +++ b/engine/thresholds_test.go @@ -148,9 +148,10 @@ func TestThresholdsPopulateThresholdService(t *testing.T) { if err != nil { t.Errorf("Error: %+v", err) } - - thServ, err = NewThresholdService(dmTH, nil, nil, 0, - &FilterS{dm: dmTH, cfg: defaultCfg}) + defaultCfg.ThresholdSCfg().StoreInterval = 0 + defaultCfg.ThresholdSCfg().StringIndexedFields = nil + defaultCfg.ThresholdSCfg().PrefixIndexedFields = nil + thServ, err = NewThresholdService(dmTH, defaultCfg, &FilterS{dm: dmTH, cfg: defaultCfg}) if err != nil { t.Errorf("Error: %+v", err) } diff --git a/services/thresholds.go b/services/thresholds.go new file mode 100644 index 000000000..41a71371f --- /dev/null +++ b/services/thresholds.go @@ -0,0 +1,106 @@ +/* +Real-time Online/Offline Charging System (OCS) for Telecom & ISP environments +Copyright (C) ITsysCOM GmbH + +This program is free software: you can redistribute it and/or modify +it under the terms of the GNU General Public License as published by +the Free Software Foundation, either version 3 of the License, or +(at your option) any later version. + +This program is distributed in the hope that it will be useful, +but WITHOUT ANY WARRANTY; without even the implied warranty of +MERCHANTABILITY or FITNESS FOR A PARTICULAR PURPOSE. See the +GNU General Public License for more details. + +You should have received a copy of the GNU General Public License +along with this program. If not, see +*/ + +package services + +import ( + "fmt" + + v1 "github.com/cgrates/cgrates/apier/v1" + "github.com/cgrates/cgrates/engine" + "github.com/cgrates/cgrates/servmanager" + "github.com/cgrates/cgrates/utils" + "github.com/cgrates/rpcclient" +) + +// NewThresholdService returns the Threshold Service +func NewThresholdService() servmanager.Service { + return &ThresholdService{ + connChan: make(chan rpcclient.RpcClientConnection, 1), + } +} + +// ThresholdService implements Service interface +type ThresholdService struct { + thrs *engine.ThresholdService + rpc *v1.ThresholdSv1 + connChan chan rpcclient.RpcClientConnection +} + +// Start should handle the sercive start +func (thrs *ThresholdService) Start(sp servmanager.ServiceProvider, waitCache bool) (err error) { + if thrs.IsRunning() { + return fmt.Errorf("service aleady running") + } + + if waitCache { + <-sp.GetCacheS().GetPrecacheChannel(utils.CacheThresholdProfiles) + <-sp.GetCacheS().GetPrecacheChannel(utils.CacheThresholds) + <-sp.GetCacheS().GetPrecacheChannel(utils.CacheThresholdFilterIndexes) + } + + thrs.thrs, err = engine.NewThresholdService(sp.GetDM(), sp.GetConfig(), sp.GetFilterS()) + if err != nil { + utils.Logger.Crit(fmt.Sprintf("<%s> Could not init, error: %s", utils.ThresholdS, err.Error())) + return + } + thrs.thrs.StartLoop() + thrs.rpc = v1.NewThresholdSv1(thrs.thrs) + if !sp.GetConfig().DispatcherSCfg().Enabled { + sp.GetServer().RpcRegister(thrs.rpc) + } + thrs.connChan <- thrs.rpc + return +} + +// GetIntenternalChan returns the internal connection chanel +func (thrs *ThresholdService) GetIntenternalChan() (conn chan rpcclient.RpcClientConnection) { + return thrs.connChan +} + +// Reload handles the change of config +func (thrs *ThresholdService) Reload(sp servmanager.ServiceProvider) (err error) { + thrs.thrs.Reload() + return +} + +// Shutdown stops the service +func (thrs *ThresholdService) Shutdown() (err error) { + if err = thrs.thrs.Shutdown(); err != nil { + return + } + thrs.thrs = nil + thrs.rpc = nil + <-thrs.connChan + return +} + +// GetRPCInterface returns the interface to register for server +func (thrs *ThresholdService) GetRPCInterface() interface{} { + return thrs.rpc +} + +// IsRunning returns if the service is running +func (thrs *ThresholdService) IsRunning() bool { + return thrs != nil && thrs.thrs != nil +} + +// ServiceName returns the service name +func (thrs *ThresholdService) ServiceName() string { + return utils.ThresholdS +} diff --git a/services/thresholds_it_test.go b/services/thresholds_it_test.go new file mode 100644 index 000000000..a86cfe4de --- /dev/null +++ b/services/thresholds_it_test.go @@ -0,0 +1,79 @@ +// +build integration + +/* +Real-time Online/Offline Charging System (OCS) for Telecom & ISP environments +Copyright (C) ITsysCOM GmbH + +This program is free software: you can redistribute it and/or modify +it under the terms of the GNU General Public License as published by +the Free Software Foundation, either version 3 of the License, or +(at your option) any later version. + +This program is distributed in the hope that it will be useful, +but WITHOUT ANY WARRANTY; without even the implied warranty of +MERCHANTABILITY or FITNESS FOR A PARTICULAR PURPOSE. See the +GNU General Public License for more details. + +You should have received a copy of the GNU General Public License +along with this program. If not, see +*/ +package services + +import ( + "path" + "testing" + "time" + + "github.com/cgrates/cgrates/config" + "github.com/cgrates/cgrates/engine" + "github.com/cgrates/cgrates/servmanager" + "github.com/cgrates/cgrates/utils" +) + +func TestThresholdSReload(t *testing.T) { + // utils.Logger.SetLogLevel(7) + cfg, err := config.NewDefaultCGRConfig() + if err != nil { + t.Fatal(err) + } + filterSChan := make(chan *engine.FilterS, 1) + filterSChan <- nil + engineShutdown := make(chan bool, 1) + chS := engine.NewCacheS(cfg, nil) + close(chS.GetPrecacheChannel(utils.CacheThresholdProfiles)) + close(chS.GetPrecacheChannel(utils.CacheThresholds)) + close(chS.GetPrecacheChannel(utils.CacheThresholdFilterIndexes)) + server := utils.NewServer() + srvMngr := servmanager.NewServiceManager(cfg /*dm*/, nil, + chS /*cdrStorage*/, nil, + /*loadStorage*/ nil, filterSChan, + server, nil, engineShutdown) + tS := NewThresholdService() + srvMngr.AddService(tS) + if err = srvMngr.StartServices(); err != nil { + t.Error(err) + } + if tS.IsRunning() { + t.Errorf("Expected service to be down") + } + var reply string + if err = cfg.V1ReloadConfig(&config.ConfigReloadWithArgDispatcher{ + Path: path.Join("/usr", "share", "cgrates", "conf", "samples", "tutmongo"), + Section: config.THRESHOLDS_JSON, + }, &reply); err != nil { + t.Error(err) + } else if reply != utils.OK { + t.Errorf("Expecting OK ,received %s", reply) + } + time.Sleep(10 * time.Millisecond) //need to switch to gorutine + if !tS.IsRunning() { + t.Errorf("Expected service to be running") + } + cfg.ThresholdSCfg().Enabled = false + cfg.GetReloadChan(config.THRESHOLDS_JSON) <- struct{}{} + time.Sleep(10 * time.Millisecond) + if tS.IsRunning() { + t.Errorf("Expected service to be down") + } + engineShutdown <- true +} diff --git a/servmanager/servmanager.go b/servmanager/servmanager.go index 093def4ce..759ea2457 100644 --- a/servmanager/servmanager.go +++ b/servmanager/servmanager.go @@ -306,7 +306,17 @@ func (srvMngr *ServiceManager) StartServices() (err error) { } }() } - + if srvMngr.cfg.ThresholdSCfg().Enabled { + go func() { + if chrS, has := srvMngr.subsystems[utils.ThresholdS]; !has { + utils.Logger.Err(fmt.Sprintf("<%s> Failed to start <%s>", utils.ServiceManager, utils.ChargerS)) + srvMngr.engineShutdown <- true + } else if err = chrS.Start(srvMngr, true); err != nil { + utils.Logger.Err(fmt.Sprintf("<%s> Failed to start %s because: %s", utils.ServiceManager, utils.ChargerS, err)) + srvMngr.engineShutdown <- true + } + }() + } // startServer() return } @@ -384,6 +394,34 @@ func (srvMngr *ServiceManager) handleReload() { return // stop if we encounter an error } } + case <-srvMngr.cfg.GetReloadChan(config.THRESHOLDS_JSON): + tS, has := srvMngr.subsystems[utils.ThresholdS] + if !has { + utils.Logger.Err(fmt.Sprintf("<%s> Failed to start <%s>", utils.ServiceManager, utils.ThresholdS)) + srvMngr.engineShutdown <- true + return // stop if we encounter an error + } + if srvMngr.cfg.ThresholdSCfg().Enabled { + if tS.IsRunning() { + if err = tS.Reload(srvMngr); err != nil { + utils.Logger.Err(fmt.Sprintf("<%s> Failed to reload <%s>", utils.ServiceManager, utils.ThresholdS)) + srvMngr.engineShutdown <- true + return // stop if we encounter an error + } + } else { + if err = tS.Start(srvMngr, true); err != nil { + utils.Logger.Err(fmt.Sprintf("<%s> Failed to start <%s>", utils.ServiceManager, utils.ThresholdS)) + srvMngr.engineShutdown <- true + return // stop if we encounter an error + } + } + } else if tS.IsRunning() { + if err = tS.Shutdown(); err != nil { + utils.Logger.Err(fmt.Sprintf("<%s> Failed to stop service <%s>", utils.ServiceManager, utils.ThresholdS)) + srvMngr.engineShutdown <- true + return // stop if we encounter an error + } + } } // handle RPC server }