diff --git a/cmd/cgr-engine/cgr-engine.go b/cmd/cgr-engine/cgr-engine.go index a11eabc8e..02c8f27eb 100644 --- a/cmd/cgr-engine/cgr-engine.go +++ b/cmd/cgr-engine/cgr-engine.go @@ -851,59 +851,6 @@ func startResourceService(internalRsChan, internalThresholdSChan, internalRsChan <- rsV1 } -// startStatService fires up the StatS -func startStatService(internalStatSChan, internalThresholdSChan, - internalDispatcherSChan chan rpcclient.RpcClientConnection, - cacheS *engine.CacheS, cfg *config.CGRConfig, - dm *engine.DataManager, server *utils.Server, - filterSChan chan *engine.FilterS, exitChan chan bool) { - var err error - var thdSConn rpcclient.RpcClientConnection - filterS := <-filterSChan - filterSChan <- filterS - intThresholdSChan := internalThresholdSChan - if cfg.DispatcherSCfg().Enabled { - intThresholdSChan = internalDispatcherSChan - } - if len(cfg.StatSCfg().ThresholdSConns) != 0 { // Stats connection init - thdSConn, err = engine.NewRPCPool(rpcclient.POOL_FIRST, - cfg.TlsCfg().ClientKey, - cfg.TlsCfg().ClientCerificate, cfg.TlsCfg().CaCertificate, - cfg.GeneralCfg().ConnectAttempts, cfg.GeneralCfg().Reconnects, - cfg.GeneralCfg().ConnectTimeout, cfg.GeneralCfg().ReplyTimeout, - cfg.StatSCfg().ThresholdSConns, intThresholdSChan, false) - if err != nil { - utils.Logger.Crit(fmt.Sprintf(" Could not connect to ThresholdS: %s", err.Error())) - exitChan <- true - return - } - } - <-cacheS.GetPrecacheChannel(utils.CacheStatQueueProfiles) - <-cacheS.GetPrecacheChannel(utils.CacheStatQueues) - <-cacheS.GetPrecacheChannel(utils.CacheStatFilterIndexes) - - sS, err := engine.NewStatService(dm, cfg.StatSCfg().StoreInterval, - thdSConn, filterS, cfg.StatSCfg().StringIndexedFields, cfg.StatSCfg().PrefixIndexedFields) - if err != nil { - utils.Logger.Crit(fmt.Sprintf(" Could not init, error: %s", err.Error())) - exitChan <- true - return - } - go func() { - if err := sS.ListenAndServe(exitChan); err != nil { - utils.Logger.Crit(fmt.Sprintf(" Error: %s listening for packets", err.Error())) - } - sS.Shutdown() - exitChan <- true - return - }() - stsV1 := v1.NewStatSv1(sS) - if !cfg.DispatcherSCfg().Enabled { - server.RpcRegister(stsV1) - } - internalStatSChan <- stsV1 -} - // startSupplierService fires up the SupplierS func startSupplierService(internalSupplierSChan, internalRsChan, internalStatSChan, internalAttrSChan, internalDispatcherSChan chan rpcclient.RpcClientConnection, @@ -1575,10 +1522,12 @@ func main() { attrS := services.NewAttributeService() chrS := services.NewChargerService() tS := services.NewThresholdService() - srvManager.AddService(attrS, chrS, tS) + stS := services.NewStatService() + srvManager.AddService(attrS, chrS, tS, stS) internalAttributeSChan = attrS.GetIntenternalChan() internalChargerSChan = chrS.GetIntenternalChan() internalThresholdSChan = tS.GetIntenternalChan() + internalStatSChan = stS.GetIntenternalChan() go srvManager.StartServices() initServiceManagerV1(internalServeManagerChan, srvManager, server) @@ -1695,12 +1644,6 @@ func main() { filterSChan, exitChan) } - if cfg.StatSCfg().Enabled { - go startStatService(internalStatSChan, internalThresholdSChan, - internalDispatcherSChan, cacheS, cfg, dm, server, - filterSChan, exitChan) - } - if cfg.SupplierSCfg().Enabled { go startSupplierService(internalSupplierSChan, internalRsChan, internalStatSChan, internalAttributeSChan, internalDispatcherSChan, diff --git a/config/config.go b/config/config.go index c804cc323..7e43c1ab1 100755 --- a/config/config.go +++ b/config/config.go @@ -1167,8 +1167,10 @@ func (self *CGRConfig) ResourceSCfg() *ResourceSConfig { return self.resourceSCfg } -// ToDo: fix locking -func (cfg *CGRConfig) StatSCfg() *StatSCfg { +// StatSCfg returns the config for StatS +func (cfg *CGRConfig) StatSCfg() *StatSCfg { // not done + cfg.lks[STATS_JSON].Lock() + defer cfg.lks[STATS_JSON].Unlock() return cfg.statsCfg } @@ -1572,6 +1574,7 @@ func (cfg *CGRConfig) reloadSection(section string) (err error) { } fallthrough case STATS_JSON: + cfg.rldChans[STATS_JSON] <- struct{}{} if !fall { break } diff --git a/config/config_it_test.go b/config/config_it_test.go index f7b976ade..452499346 100644 --- a/config/config_it_test.go +++ b/config/config_it_test.go @@ -163,6 +163,34 @@ func TestCGRConfigReloadThresholdS(t *testing.T) { } } +func TestCGRConfigReloadStatS(t *testing.T) { + cfg, err := NewDefaultCGRConfig() + if err != nil { + t.Fatal(err) + } + var reply string + if err = cfg.V1ReloadConfig(&ConfigReloadWithArgDispatcher{ + Path: path.Join("/usr", "share", "cgrates", "conf", "samples", "tutmongo2"), + Section: STATS_JSON, + }, &reply); err != nil { + t.Error(err) + } else if reply != utils.OK { + t.Errorf("Expected OK received: %s", reply) + } + expAttr := &StatSCfg{ + Enabled: true, + StringIndexedFields: &[]string{utils.Account}, + PrefixIndexedFields: &[]string{}, + IndexedSelects: true, + ThresholdSConns: []*RemoteHost{ + &RemoteHost{Address: "127.0.0.1:2012", Transport: utils.MetaJSONrpc}, + }, + } + if !reflect.DeepEqual(expAttr, cfg.StatSCfg()) { + t.Errorf("Expected %s , received: %s ", utils.ToJSON(expAttr), utils.ToJSON(cfg.StatSCfg())) + } +} + func TestCgrCfgV1ReloadConfigSection(t *testing.T) { for _, dir := range []string{"/tmp/ers/in", "/tmp/ers/out"} { if err := os.RemoveAll(dir); err != nil { diff --git a/engine/stats.go b/engine/stats.go index 7bbc848bf..bf020a924 100644 --- a/engine/stats.go +++ b/engine/stats.go @@ -32,33 +32,31 @@ import ( ) // NewStatService initializes a StatService -func NewStatService(dm *DataManager, storeInterval time.Duration, - thdS rpcclient.RpcClientConnection, filterS *FilterS, stringIndexedFields, prefixIndexedFields *[]string) (ss *StatService, err error) { +func NewStatService(dm *DataManager, cgrcfg *config.CGRConfig, + thdS rpcclient.RpcClientConnection, filterS *FilterS) (ss *StatService, err error) { if thdS != nil && reflect.ValueOf(thdS).IsNil() { // fix nil value in interface thdS = nil } return &StatService{ - dm: dm, - storeInterval: storeInterval, - thdS: thdS, - filterS: filterS, - stringIndexedFields: stringIndexedFields, - prefixIndexedFields: prefixIndexedFields, - storedStatQueues: make(utils.StringMap), - stopBackup: make(chan struct{})}, nil + dm: dm, + thdS: thdS, + filterS: filterS, + cgrcfg: cgrcfg, + storedStatQueues: make(utils.StringMap), + loopStoped: make(chan struct{}), + stopBackup: make(chan struct{})}, nil } // StatService builds stats for events type StatService struct { - dm *DataManager - storeInterval time.Duration - thdS rpcclient.RpcClientConnection // rpc connection towards ThresholdS - filterS *FilterS - stringIndexedFields *[]string - prefixIndexedFields *[]string - stopBackup chan struct{} - storedStatQueues utils.StringMap // keep a record of stats which need saving, map[statsTenantID]bool - ssqMux sync.RWMutex // protects storedStatQueues + dm *DataManager + thdS rpcclient.RpcClientConnection // rpc connection towards ThresholdS + filterS *FilterS + cgrcfg *config.CGRConfig + loopStoped chan struct{} + stopBackup chan struct{} + storedStatQueues utils.StringMap // keep a record of stats which need saving, map[statsTenantID]bool + ssqMux sync.RWMutex // protects storedStatQueues } // ListenAndServe loops keeps the service alive @@ -81,17 +79,19 @@ func (sS *StatService) Shutdown() error { // runBackup will regularly store resources changed to dataDB func (sS *StatService) runBackup() { - if sS.storeInterval <= 0 { + storeInterval := sS.cgrcfg.StatSCfg().StoreInterval + if storeInterval <= 0 { + sS.loopStoped <- struct{}{} return } for { + sS.storeStats() select { case <-sS.stopBackup: + sS.loopStoped <- struct{}{} return - default: + case <-time.After(storeInterval): } - sS.storeStats() - time.Sleep(sS.storeInterval) } } @@ -160,7 +160,7 @@ func (sS *StatService) matchingStatQueuesForEvent(args *StatsArgsProcessEvent) ( if len(args.StatIDs) != 0 { sqIDs = args.StatIDs } else { - mapIDs, err := MatchingItemIDsForEvent(args.Event, sS.stringIndexedFields, sS.prefixIndexedFields, + mapIDs, err := MatchingItemIDsForEvent(args.Event, sS.cgrcfg.StatSCfg().StringIndexedFields, sS.cgrcfg.StatSCfg().PrefixIndexedFields, sS.dm, utils.CacheStatFilterIndexes, args.Tenant, sS.filterS.cfg.StatSCfg().IndexedSelects) if err != nil { return nil, err @@ -257,8 +257,8 @@ func (sS *StatService) processEvent(args *StatsArgsProcessEvent) (statQueueIDs [ sq.TenantID(), args.TenantID(), err.Error())) withErrors = true } - if sS.storeInterval != 0 && sq.dirty != nil { // don't save - if sS.storeInterval == -1 { + if sS.cgrcfg.StatSCfg().StoreInterval != 0 && sq.dirty != nil { // don't save + if sS.cgrcfg.StatSCfg().StoreInterval == -1 { sS.StoreStatQueue(sq) } else { *sq.dirty = true // mark it to be saved @@ -409,3 +409,16 @@ func (sS *StatService) V1GetQueueIDs(tenant string, qIDs *[]string) (err error) *qIDs = retIDs return } + +// Reload stops the backupLoop and restarts it +func (sS *StatService) Reload() { + close(sS.stopBackup) + <-sS.loopStoped // wait until the loop is done + sS.stopBackup = make(chan struct{}) + go sS.runBackup() +} + +// StartLoop starsS the gorutine with the backup loop +func (sS *StatService) StartLoop() { + go sS.runBackup() +} diff --git a/engine/stats_test.go b/engine/stats_test.go index daeb772b8..fecb66d0b 100644 --- a/engine/stats_test.go +++ b/engine/stats_test.go @@ -149,8 +149,11 @@ func TestStatQueuesPopulateService(t *testing.T) { t.Errorf("Error: %+v", err) } - statService, err = NewStatService(dmSTS, time.Duration(1), - nil, &FilterS{dm: dmSTS, cfg: defaultCfg}, nil, nil) + defaultCfg.StatSCfg().StoreInterval = 1 + defaultCfg.StatSCfg().StringIndexedFields = nil + defaultCfg.StatSCfg().PrefixIndexedFields = nil + statService, err = NewStatService(dmSTS, defaultCfg, + nil, &FilterS{dm: dmSTS, cfg: defaultCfg}) if err != nil { t.Errorf("Error: %+v", err) } diff --git a/services/stats.go b/services/stats.go new file mode 100644 index 000000000..3b5288dc1 --- /dev/null +++ b/services/stats.go @@ -0,0 +1,111 @@ +/* +Real-time Online/Offline Charging System (OCS) for Telecom & ISP environments +Copyright (C) ITsysCOM GmbH + +This program is free software: you can redistribute it and/or modify +it under the terms of the GNU General Public License as published by +the Free Software Foundation, either version 3 of the License, or +(at your option) any later version. + +This program is distributed in the hope that it will be useful, +but WITHOUT ANY WARRANTY; without even the implied warranty of +MERCHANTABILITY or FITNESS FOR A PARTICULAR PURPOSE. See the +GNU General Public License for more details. + +You should have received a copy of the GNU General Public License +along with this program. If not, see +*/ + +package services + +import ( + "fmt" + + v1 "github.com/cgrates/cgrates/apier/v1" + "github.com/cgrates/cgrates/engine" + "github.com/cgrates/cgrates/servmanager" + "github.com/cgrates/cgrates/utils" + "github.com/cgrates/rpcclient" +) + +// NewStatService returns the Stat Service +func NewStatService() servmanager.Service { + return &StatService{ + connChan: make(chan rpcclient.RpcClientConnection, 1), + } +} + +// StatService implements Service interface +type StatService struct { + sts *engine.StatService + rpc *v1.StatSv1 + connChan chan rpcclient.RpcClientConnection +} + +// Start should handle the sercive start +func (sts *StatService) Start(sp servmanager.ServiceProvider, waitCache bool) (err error) { + if sts.IsRunning() { + return fmt.Errorf("service aleady running") + } + if waitCache { + <-sp.GetCacheS().GetPrecacheChannel(utils.CacheStatQueueProfiles) + <-sp.GetCacheS().GetPrecacheChannel(utils.CacheStatQueues) + <-sp.GetCacheS().GetPrecacheChannel(utils.CacheStatFilterIndexes) + } + + var thdSConn rpcclient.RpcClientConnection + if thdSConn, err = sp.GetConnection(utils.ThresholdS, sp.GetConfig().StatSCfg().ThresholdSConns); err != nil { + utils.Logger.Crit(fmt.Sprintf("<%s> Could not connect to ThresholdS: %s", utils.StatS, err.Error())) + return + } + sts.sts, err = engine.NewStatService(sp.GetDM(), sp.GetConfig(), thdSConn, sp.GetFilterS()) + if err != nil { + utils.Logger.Crit(fmt.Sprintf(" Could not init, error: %s", err.Error())) + return + } + utils.Logger.Info(fmt.Sprintf("<%s> starting <%s> subsystem", utils.CoreS, utils.StatS)) + sts.sts.StartLoop() + sts.rpc = v1.NewStatSv1(sts.sts) + if !sp.GetConfig().DispatcherSCfg().Enabled { + sp.GetServer().RpcRegister(sts.rpc) + } + sts.connChan <- sts.rpc + return +} + +// GetIntenternalChan returns the internal connection chanel +func (sts *StatService) GetIntenternalChan() (conn chan rpcclient.RpcClientConnection) { + return sts.connChan +} + +// Reload handles the change of config +func (sts *StatService) Reload(sp servmanager.ServiceProvider) (err error) { + sts.sts.Reload() + return +} + +// Shutdown stops the service +func (sts *StatService) Shutdown() (err error) { + if err = sts.sts.Shutdown(); err != nil { + return + } + sts.sts = nil + sts.rpc = nil + <-sts.connChan + return +} + +// GetRPCInterface returns the interface to register for server +func (sts *StatService) GetRPCInterface() interface{} { + return sts.rpc +} + +// IsRunning returns if the service is running +func (sts *StatService) IsRunning() bool { + return sts != nil && sts.sts != nil +} + +// ServiceName returns the service name +func (sts *StatService) ServiceName() string { + return utils.StatS +} diff --git a/services/stats_it_test.go b/services/stats_it_test.go new file mode 100644 index 000000000..71396ac78 --- /dev/null +++ b/services/stats_it_test.go @@ -0,0 +1,83 @@ +// +build integration + +/* +Real-time Online/Offline Charging System (OCS) for Telecom & ISP environments +Copyright (C) ITsysCOM GmbH + +This program is free software: you can redistribute it and/or modify +it under the terms of the GNU General Public License as published by +the Free Software Foundation, either version 3 of the License, or +(at your option) any later version. + +This program is distributed in the hope that it will be useful, +but WITHOUT ANY WARRANTY; without even the implied warranty of +MERCHANTABILITY or FITNESS FOR A PARTICULAR PURPOSE. See the +GNU General Public License for more details. + +You should have received a copy of the GNU General Public License +along with this program. If not, see +*/ +package services + +import ( + "path" + "testing" + "time" + + "github.com/cgrates/cgrates/config" + "github.com/cgrates/cgrates/engine" + "github.com/cgrates/cgrates/servmanager" + "github.com/cgrates/cgrates/utils" +) + +func TestStatSReload(t *testing.T) { + // utils.Logger.SetLogLevel(7) + cfg, err := config.NewDefaultCGRConfig() + if err != nil { + t.Fatal(err) + } + cfg.ThresholdSCfg().Enabled = true + filterSChan := make(chan *engine.FilterS, 1) + filterSChan <- nil + engineShutdown := make(chan bool, 1) + chS := engine.NewCacheS(cfg, nil) + close(chS.GetPrecacheChannel(utils.CacheThresholdProfiles)) + close(chS.GetPrecacheChannel(utils.CacheThresholds)) + close(chS.GetPrecacheChannel(utils.CacheThresholdFilterIndexes)) + close(chS.GetPrecacheChannel(utils.CacheStatQueueProfiles)) + close(chS.GetPrecacheChannel(utils.CacheStatQueues)) + close(chS.GetPrecacheChannel(utils.CacheStatFilterIndexes)) + server := utils.NewServer() + srvMngr := servmanager.NewServiceManager(cfg /*dm*/, nil, + chS /*cdrStorage*/, nil, + /*loadStorage*/ nil, filterSChan, + server, nil, engineShutdown) + sS := NewStatService() + srvMngr.AddService(NewThresholdService(), sS) + if err = srvMngr.StartServices(); err != nil { + t.Error(err) + } + if sS.IsRunning() { + t.Errorf("Expected service to be down") + } + var reply string + if err = cfg.V1ReloadConfig(&config.ConfigReloadWithArgDispatcher{ + Path: path.Join("/usr", "share", "cgrates", "conf", "samples", "tutmongo"), + Section: config.STATS_JSON, + }, &reply); err != nil { + t.Error(err) + } else if reply != utils.OK { + t.Errorf("Expecting OK ,received %s", reply) + } + time.Sleep(10 * time.Millisecond) //need to switch to gorutine + if !sS.IsRunning() { + t.Errorf("Expected service to be running") + } + cfg.StatSCfg().Enabled = false + cfg.GetReloadChan(config.STATS_JSON) <- struct{}{} + time.Sleep(10 * time.Millisecond) + if sS.IsRunning() { + t.Errorf("Expected service to be down") + } + engineShutdown <- true +} diff --git a/services/thresholds.go b/services/thresholds.go index 41a71371f..9218edc51 100644 --- a/services/thresholds.go +++ b/services/thresholds.go @@ -59,6 +59,7 @@ func (thrs *ThresholdService) Start(sp servmanager.ServiceProvider, waitCache bo utils.Logger.Crit(fmt.Sprintf("<%s> Could not init, error: %s", utils.ThresholdS, err.Error())) return } + utils.Logger.Info(fmt.Sprintf("<%s> starting <%s> subsystem", utils.CoreS, utils.ThresholdS)) thrs.thrs.StartLoop() thrs.rpc = v1.NewThresholdSv1(thrs.thrs) if !sp.GetConfig().DispatcherSCfg().Enabled { diff --git a/servmanager/servmanager.go b/servmanager/servmanager.go index 759ea2457..fecd04fdb 100644 --- a/servmanager/servmanager.go +++ b/servmanager/servmanager.go @@ -309,10 +309,21 @@ func (srvMngr *ServiceManager) StartServices() (err error) { if srvMngr.cfg.ThresholdSCfg().Enabled { go func() { if chrS, has := srvMngr.subsystems[utils.ThresholdS]; !has { - utils.Logger.Err(fmt.Sprintf("<%s> Failed to start <%s>", utils.ServiceManager, utils.ChargerS)) + utils.Logger.Err(fmt.Sprintf("<%s> Failed to start <%s>", utils.ServiceManager, utils.ThresholdS)) srvMngr.engineShutdown <- true } else if err = chrS.Start(srvMngr, true); err != nil { - utils.Logger.Err(fmt.Sprintf("<%s> Failed to start %s because: %s", utils.ServiceManager, utils.ChargerS, err)) + utils.Logger.Err(fmt.Sprintf("<%s> Failed to start %s because: %s", utils.ServiceManager, utils.ThresholdS, err)) + srvMngr.engineShutdown <- true + } + }() + } + if srvMngr.cfg.StatSCfg().Enabled { + go func() { + if chrS, has := srvMngr.subsystems[utils.StatS]; !has { + utils.Logger.Err(fmt.Sprintf("<%s> Failed to start <%s>", utils.ServiceManager, utils.StatS)) + srvMngr.engineShutdown <- true + } else if err = chrS.Start(srvMngr, true); err != nil { + utils.Logger.Err(fmt.Sprintf("<%s> Failed to start %s because: %s", utils.ServiceManager, utils.StatS, err)) srvMngr.engineShutdown <- true } }() @@ -422,6 +433,34 @@ func (srvMngr *ServiceManager) handleReload() { return // stop if we encounter an error } } + case <-srvMngr.cfg.GetReloadChan(config.STATS_JSON): + tS, has := srvMngr.subsystems[utils.StatS] + if !has { + utils.Logger.Err(fmt.Sprintf("<%s> Failed to start <%s>", utils.ServiceManager, utils.StatS)) + srvMngr.engineShutdown <- true + return // stop if we encounter an error + } + if srvMngr.cfg.StatSCfg().Enabled { + if tS.IsRunning() { + if err = tS.Reload(srvMngr); err != nil { + utils.Logger.Err(fmt.Sprintf("<%s> Failed to reload <%s>", utils.ServiceManager, utils.StatS)) + srvMngr.engineShutdown <- true + return // stop if we encounter an error + } + } else { + if err = tS.Start(srvMngr, true); err != nil { + utils.Logger.Err(fmt.Sprintf("<%s> Failed to start <%s>", utils.ServiceManager, utils.StatS)) + srvMngr.engineShutdown <- true + return // stop if we encounter an error + } + } + } else if tS.IsRunning() { + if err = tS.Shutdown(); err != nil { + utils.Logger.Err(fmt.Sprintf("<%s> Failed to stop service <%s>", utils.ServiceManager, utils.StatS)) + srvMngr.engineShutdown <- true + return // stop if we encounter an error + } + } } // handle RPC server }