From da81afcdea7f179675c5468addab18a3106f4003 Mon Sep 17 00:00:00 2001 From: Trial97 Date: Fri, 13 Sep 2019 17:41:38 +0300 Subject: [PATCH] Added ChargerS as service in ServiceManager --- cmd/cgr-engine/cgr-engine.go | 16 ++--- config/config.go | 3 + config/config_it_test.go | 33 +++++++++- integration_test.sh | 5 +- services/attributes.go | 1 + services/attributes_it_test.go | 18 ++++-- services/chargers.go | 111 +++++++++++++++++++++++++++++++++ services/chargers_it_test.go | 81 ++++++++++++++++++++++++ servmanager/servmanager.go | 64 ++++++++++++++++--- 9 files changed, 310 insertions(+), 22 deletions(-) create mode 100644 services/chargers.go create mode 100644 services/chargers_it_test.go diff --git a/cmd/cgr-engine/cgr-engine.go b/cmd/cgr-engine/cgr-engine.go index c65a3f66a..bef98c979 100644 --- a/cmd/cgr-engine/cgr-engine.go +++ b/cmd/cgr-engine/cgr-engine.go @@ -1692,10 +1692,12 @@ func main() { // Start ServiceManager srvManager := servmanager.NewServiceManager(cfg, dm, cacheS, cdrDb, - loadDb, filterSChan, server, exitChan) + loadDb, filterSChan, server, internalDispatcherSChan, exitChan) attrS := services.NewAttributeService() - srvManager.AddService(attrS) + chrS := services.NewChargerService() + srvManager.AddService(attrS, chrS) internalAttributeSChan = attrS.GetIntenternalChan() + internalChargerSChan = chrS.GetIntenternalChan() go srvManager.StartServices() initServiceManagerV1(internalServeManagerChan, srvManager, server) @@ -1809,11 +1811,11 @@ func main() { // go startAttributeService(internalAttributeSChan, cacheS, // cfg, dm, server, filterSChan, exitChan) // } - if cfg.ChargerSCfg().Enabled { - go startChargerService(internalChargerSChan, internalAttributeSChan, - internalDispatcherSChan, cacheS, cfg, dm, server, - filterSChan, exitChan) - } + // if cfg.ChargerSCfg().Enabled { + // go startChargerService(internalChargerSChan, internalAttributeSChan, + // internalDispatcherSChan, cacheS, cfg, dm, server, + // filterSChan, exitChan) + // } // Start RL service if cfg.ResourceSCfg().Enabled { diff --git a/config/config.go b/config/config.go index c0102f532..d2cda7aef 100755 --- a/config/config.go +++ b/config/config.go @@ -1155,7 +1155,10 @@ func (cfg *CGRConfig) AttributeSCfg() *AttributeSCfg { return cfg.attributeSCfg } +// ChargerSCfg returns the config for ChargerS func (cfg *CGRConfig) ChargerSCfg() *ChargerSCfg { + cfg.lks[ChargerSCfgJson].Lock() + defer cfg.lks[ChargerSCfgJson].Unlock() return cfg.chargerSCfg } diff --git a/config/config_it_test.go b/config/config_it_test.go index 1718b3038..21de6aea2 100644 --- a/config/config_it_test.go +++ b/config/config_it_test.go @@ -81,7 +81,7 @@ func TestNewCGRConfigFromPath(t *testing.T) { } } -func TestCGRConfigReload(t *testing.T) { +func TestCGRConfigReloadAttributeS(t *testing.T) { cfg, err := NewDefaultCGRConfig() if err != nil { t.Fatal(err) @@ -107,6 +107,37 @@ func TestCGRConfigReload(t *testing.T) { } } +func TestCGRConfigReloadChargerS(t *testing.T) { + cfg, err := NewDefaultCGRConfig() + if err != nil { + t.Fatal(err) + } + var reply string + if err = cfg.V1ReloadConfig(&ConfigReloadWithArgDispatcher{ + Path: path.Join("/usr", "share", "cgrates", "conf", "samples", "tutmongo2"), + Section: ChargerSCfgJson, + }, &reply); err != nil { + t.Error(err) + } else if reply != utils.OK { + t.Errorf("Expected OK received: %s", reply) + } + expAttr := &ChargerSCfg{ + Enabled: true, + StringIndexedFields: &[]string{utils.Account}, + PrefixIndexedFields: &[]string{}, + IndexedSelects: true, + AttributeSConns: []*RemoteHost{ + &RemoteHost{ + Address: "127.0.0.1:2012", + Transport: utils.MetaJSONrpc, + }, + }, + } + if !reflect.DeepEqual(expAttr, cfg.ChargerSCfg()) { + t.Errorf("Expected %s , received: %s ", utils.ToJSON(expAttr), utils.ToJSON(cfg.ChargerSCfg())) + } +} + func TestCgrCfgV1ReloadConfigSection(t *testing.T) { for _, dir := range []string{"/tmp/ers/in", "/tmp/ers/out"} { if err := os.RemoveAll(dir); err != nil { diff --git a/integration_test.sh b/integration_test.sh index 10fc89d5e..21c042bbb 100755 --- a/integration_test.sh +++ b/integration_test.sh @@ -41,8 +41,11 @@ dis=$? echo 'go test github.com/cgrates/cgrates/loaders -tags=integration' go test github.com/cgrates/cgrates/loaders -tags=integration lds=$? +echo 'go test github.com/cgrates/cgrates/services -tags=integration' +go test github.com/cgrates/cgrates/services -tags=integration +srv=$? echo 'go test github.com/cgrates/cgrates/apier/v1 -tags=offline' go test github.com/cgrates/cgrates/apier/v1 -tags=offline offline=$? -exit $gen && $ap1 && $ap2 && $en && $cdrc && $cfg && $utl && $gnr && $agts && $smg && $mgr && $dis && $lds && $ers && offline +exit $gen && $ap1 && $ap2 && $en && $cdrc && $cfg && $utl && $gnr && $agts && $smg && $mgr && $dis && $lds && $ers && $srv && $offline diff --git a/services/attributes.go b/services/attributes.go index 41b4191fe..ba0f45b29 100644 --- a/services/attributes.go +++ b/services/attributes.go @@ -28,6 +28,7 @@ import ( "github.com/cgrates/rpcclient" ) +// NewAttributeService returns the Attribute Service func NewAttributeService() servmanager.Service { return &AttributeService{ connChan: make(chan rpcclient.RpcClientConnection, 1), diff --git a/services/attributes_it_test.go b/services/attributes_it_test.go index 3113c7517..ec104d84b 100644 --- a/services/attributes_it_test.go +++ b/services/attributes_it_test.go @@ -41,13 +41,15 @@ func TestAttributeSReload(t *testing.T) { chS := engine.NewCacheS(cfg, nil) close(chS.GetPrecacheChannel(utils.CacheAttributeProfiles)) close(chS.GetPrecacheChannel(utils.CacheAttributeFilterIndexes)) + close(chS.GetPrecacheChannel(utils.CacheChargerProfiles)) + close(chS.GetPrecacheChannel(utils.CacheChargerFilterIndexes)) server := utils.NewServer() srvMngr := servmanager.NewServiceManager(cfg /*dm*/, nil, chS /*cdrStorage*/, nil, /*loadStorage*/ nil, filterSChan, - server, engineShutdown) + server, nil, engineShutdown) attrS := NewAttributeService() - srvMngr.AddService(attrS) + srvMngr.AddService(attrS, NewChargerService()) if err = srvMngr.StartServices(); err != nil { t.Error(err) } @@ -55,17 +57,21 @@ func TestAttributeSReload(t *testing.T) { t.Errorf("Expected service to be down") } var reply string - cfg.V1ReloadConfig(&config.ConfigReloadWithArgDispatcher{ + if err := cfg.V1ReloadConfig(&config.ConfigReloadWithArgDispatcher{ Path: path.Join("/usr", "share", "cgrates", "conf", "samples", "tutmongo"), Section: config.ATTRIBUTE_JSN, - }, &reply) - time.Sleep(1) //need to switch to gorutine + }, &reply); err != nil { + t.Error(err) + } else if reply != utils.OK { + t.Errorf("Expecting OK ,received %s", reply) + } + time.Sleep(10 * time.Millisecond) //need to switch to gorutine if !attrS.IsRunning() { t.Errorf("Expected service to be running") } cfg.AttributeSCfg().Enabled = false cfg.GetReloadChan(config.ATTRIBUTE_JSN) <- struct{}{} - time.Sleep(1) + time.Sleep(10 * time.Millisecond) if attrS.IsRunning() { t.Errorf("Expected service to be down") } diff --git a/services/chargers.go b/services/chargers.go new file mode 100644 index 000000000..b44dad7d8 --- /dev/null +++ b/services/chargers.go @@ -0,0 +1,111 @@ +/* +Real-time Online/Offline Charging System (OCS) for Telecom & ISP environments +Copyright (C) ITsysCOM GmbH + +This program is free software: you can redistribute it and/or modify +it under the terms of the GNU General Public License as published by +the Free Software Foundation, either version 3 of the License, or +(at your option) any later version. + +This program is distributed in the hope that it will be useful, +but WITHOUT ANY WARRANTY; without even the implied warranty of +MERCHANTABILITY or FITNESS FOR A PARTICULAR PURPOSE. See the +GNU General Public License for more details. + +You should have received a copy of the GNU General Public License +along with this program. If not, see +*/ + +package services + +import ( + "fmt" + + v1 "github.com/cgrates/cgrates/apier/v1" + "github.com/cgrates/cgrates/engine" + "github.com/cgrates/cgrates/servmanager" + "github.com/cgrates/cgrates/utils" + "github.com/cgrates/rpcclient" +) + +// NewChargerService returns the Charger Service +func NewChargerService() servmanager.Service { + return &ChargerService{ + connChan: make(chan rpcclient.RpcClientConnection, 1), + } +} + +// ChargerService implements Service interface +type ChargerService struct { + chrS *engine.ChargerService + rpc *v1.ChargerSv1 + connChan chan rpcclient.RpcClientConnection +} + +// Start should handle the sercive start +func (chrS *ChargerService) Start(sp servmanager.ServiceProvider, waitCache bool) (err error) { + if chrS.IsRunning() { + return fmt.Errorf("service aleady running") + } + if waitCache { + <-sp.GetCacheS().GetPrecacheChannel(utils.CacheChargerProfiles) + <-sp.GetCacheS().GetPrecacheChannel(utils.CacheChargerFilterIndexes) + } + var chrSConn rpcclient.RpcClientConnection + if chrSConn, err = sp.GetConnection(utils.AttributeS, sp.GetConfig().ChargerSCfg().AttributeSConns); err != nil { + utils.Logger.Crit(fmt.Sprintf("<%s> Could not connect to %s: %s", + utils.ChargerS, utils.AttributeS, err.Error())) + return + } + + if chrS.chrS, err = engine.NewChargerService(sp.GetDM(), sp.GetFilterS(), chrSConn, sp.GetConfig()); err != nil { + utils.Logger.Crit( + fmt.Sprintf("<%s> Could not init, error: %s", + utils.ChargerS, err.Error())) + return + } + utils.Logger.Info(fmt.Sprintf("<%s> starting <%s> subsystem", utils.CoreS, utils.ChargerS)) + cSv1 := v1.NewChargerSv1(chrS.chrS) + if !sp.GetConfig().DispatcherSCfg().Enabled { + sp.GetServer().RpcRegister(cSv1) + } + chrS.connChan <- cSv1 + return +} + +// GetIntenternalChan returns the internal connection chanel +func (chrS *ChargerService) GetIntenternalChan() (conn chan rpcclient.RpcClientConnection) { + return chrS.connChan +} + +// Reload handles the change of config +func (chrS *ChargerService) Reload(sp servmanager.ServiceProvider) (err error) { + // need to reload the connection to the attributeS + return +} + +// Shutdown stops the service +func (chrS *ChargerService) Shutdown() (err error) { + if err = chrS.chrS.Shutdown(); err != nil { + return + } + chrS.chrS = nil + chrS.rpc = nil + <-chrS.connChan + return +} + +// GetRPCInterface returns the interface to register for server +func (chrS *ChargerService) GetRPCInterface() interface{} { + return chrS.rpc +} + +// IsRunning returns if the service is running +func (chrS *ChargerService) IsRunning() bool { + return chrS != nil && chrS.chrS != nil +} + +// ServiceName returns the service name +func (chrS *ChargerService) ServiceName() string { + return utils.ChargerS +} diff --git a/services/chargers_it_test.go b/services/chargers_it_test.go new file mode 100644 index 000000000..8f902f757 --- /dev/null +++ b/services/chargers_it_test.go @@ -0,0 +1,81 @@ +// +build integration + +/* +Real-time Online/Offline Charging System (OCS) for Telecom & ISP environments +Copyright (C) ITsysCOM GmbH + +This program is free software: you can redistribute it and/or modify +it under the terms of the GNU General Public License as published by +the Free Software Foundation, either version 3 of the License, or +(at your option) any later version. + +This program is distributed in the hope that it will be useful, +but WITHOUT ANY WARRANTY; without even the implied warranty of +MERCHANTABILITY or FITNESS FOR A PARTICULAR PURPOSE. See the +GNU General Public License for more details. + +You should have received a copy of the GNU General Public License +along with this program. If not, see +*/ +package services + +import ( + "path" + "testing" + "time" + + "github.com/cgrates/cgrates/config" + "github.com/cgrates/cgrates/engine" + "github.com/cgrates/cgrates/servmanager" + "github.com/cgrates/cgrates/utils" +) + +func TestChargerSReload(t *testing.T) { + // utils.Logger.SetLogLevel(7) + cfg, err := config.NewDefaultCGRConfig() + if err != nil { + t.Fatal(err) + } + cfg.AttributeSCfg().Enabled = true + filterSChan := make(chan *engine.FilterS, 1) + filterSChan <- nil + engineShutdown := make(chan bool, 1) + chS := engine.NewCacheS(cfg, nil) + close(chS.GetPrecacheChannel(utils.CacheAttributeProfiles)) + close(chS.GetPrecacheChannel(utils.CacheAttributeFilterIndexes)) + close(chS.GetPrecacheChannel(utils.CacheChargerProfiles)) + close(chS.GetPrecacheChannel(utils.CacheChargerFilterIndexes)) + server := utils.NewServer() + srvMngr := servmanager.NewServiceManager(cfg /*dm*/, nil, + chS /*cdrStorage*/, nil, + /*loadStorage*/ nil, filterSChan, + server, nil, engineShutdown) + chrS := NewChargerService() + srvMngr.AddService(NewAttributeService(), chrS) + if err = srvMngr.StartServices(); err != nil { + t.Error(err) + } + if chrS.IsRunning() { + t.Errorf("Expected service to be down") + } + var reply string + if err = cfg.V1ReloadConfig(&config.ConfigReloadWithArgDispatcher{ + Path: path.Join("/usr", "share", "cgrates", "conf", "samples", "tutmongo"), + Section: config.ChargerSCfgJson, + }, &reply); err != nil { + t.Error(err) + } else if reply != utils.OK { + t.Errorf("Expecting OK ,received %s", reply) + } + time.Sleep(10 * time.Millisecond) //need to switch to gorutine + if !chrS.IsRunning() { + t.Errorf("Expected service to be running") + } + cfg.ChargerSCfg().Enabled = false + cfg.GetReloadChan(config.ChargerSCfgJson) <- struct{}{} + time.Sleep(10 * time.Millisecond) + if chrS.IsRunning() { + t.Errorf("Expected service to be down") + } + engineShutdown <- true +} diff --git a/servmanager/servmanager.go b/servmanager/servmanager.go index 0cab5267b..093def4ce 100644 --- a/servmanager/servmanager.go +++ b/servmanager/servmanager.go @@ -36,18 +36,20 @@ import ( func NewServiceManager(cfg *config.CGRConfig, dm *engine.DataManager, cacheS *engine.CacheS, cdrStorage engine.CdrStorage, loadStorage engine.LoadStorage, filterSChan chan *engine.FilterS, - server *utils.Server, engineShutdown chan bool) *ServiceManager { + server *utils.Server, dispatcherSChan chan rpcclient.RpcClientConnection, + engineShutdown chan bool) *ServiceManager { sm := &ServiceManager{ cfg: cfg, dm: dm, engineShutdown: engineShutdown, cacheS: cacheS, - cdrStorage: cdrStorage, - loadStorage: loadStorage, - filterS: filterSChan, - server: server, - subsystems: make(map[string]Service), + cdrStorage: cdrStorage, + loadStorage: loadStorage, + filterS: filterSChan, + server: server, + subsystems: make(map[string]Service), + dispatcherSChan: dispatcherSChan, } return sm } @@ -66,6 +68,8 @@ type ServiceManager struct { filterS chan *engine.FilterS server *utils.Server subsystems map[string]Service + + dispatcherSChan chan rpcclient.RpcClientConnection } func (srvMngr *ServiceManager) StartScheduler(waitCache bool) error { @@ -262,12 +266,19 @@ func (srvMngr *ServiceManager) GetExitChan() chan bool { // GetConnection creates a rpcClient to the specified subsystem func (srvMngr *ServiceManager) GetConnection(subsystem string, conns []*config.RemoteHost) (rpcclient.RpcClientConnection, error) { + if len(conns) == 0 { + return nil, nil + } + internalChan := srvMngr.subsystems[subsystem].GetIntenternalChan() + if srvMngr.GetConfig().DispatcherSCfg().Enabled { + internalChan = srvMngr.dispatcherSChan + } return engine.NewRPCPool(rpcclient.POOL_FIRST, srvMngr.cfg.TlsCfg().ClientKey, srvMngr.cfg.TlsCfg().ClientCerificate, srvMngr.cfg.TlsCfg().CaCertificate, srvMngr.cfg.GeneralCfg().ConnectAttempts, srvMngr.cfg.GeneralCfg().Reconnects, srvMngr.cfg.GeneralCfg().ConnectTimeout, srvMngr.cfg.GeneralCfg().ReplyTimeout, - conns, srvMngr.subsystems[subsystem].GetIntenternalChan(), false) + conns, internalChan, false) } // StartServices starts all enabled services @@ -284,6 +295,17 @@ func (srvMngr *ServiceManager) StartServices() (err error) { } }() } + if srvMngr.cfg.ChargerSCfg().Enabled { + go func() { + if chrS, has := srvMngr.subsystems[utils.ChargerS]; !has { + utils.Logger.Err(fmt.Sprintf("<%s> Failed to start <%s>", utils.ServiceManager, utils.ChargerS)) + srvMngr.engineShutdown <- true + } else if err = chrS.Start(srvMngr, true); err != nil { + utils.Logger.Err(fmt.Sprintf("<%s> Failed to start %s because: %s", utils.ServiceManager, utils.ChargerS, err)) + srvMngr.engineShutdown <- true + } + }() + } // startServer() return @@ -334,6 +356,34 @@ func (srvMngr *ServiceManager) handleReload() { return // stop if we encounter an error } } + case <-srvMngr.cfg.GetReloadChan(config.ChargerSCfgJson): + chrS, has := srvMngr.subsystems[utils.ChargerS] + if !has { + utils.Logger.Err(fmt.Sprintf("<%s> Failed to start <%s>", utils.ServiceManager, utils.ChargerS)) + srvMngr.engineShutdown <- true + return // stop if we encounter an error + } + if srvMngr.cfg.ChargerSCfg().Enabled { + if chrS.IsRunning() { + if err = chrS.Reload(srvMngr); err != nil { + utils.Logger.Err(fmt.Sprintf("<%s> Failed to reload <%s>", utils.ServiceManager, utils.ChargerS)) + srvMngr.engineShutdown <- true + return // stop if we encounter an error + } + } else { + if err = chrS.Start(srvMngr, true); err != nil { + utils.Logger.Err(fmt.Sprintf("<%s> Failed to start <%s>", utils.ServiceManager, utils.ChargerS)) + srvMngr.engineShutdown <- true + return // stop if we encounter an error + } + } + } else if chrS.IsRunning() { + if err = chrS.Shutdown(); err != nil { + utils.Logger.Err(fmt.Sprintf("<%s> Failed to stop service <%s>", utils.ServiceManager, utils.ChargerS)) + srvMngr.engineShutdown <- true + return // stop if we encounter an error + } + } } // handle RPC server }