diff --git a/cmd/cgr-engine/cgr-engine.go b/cmd/cgr-engine/cgr-engine.go
index c65a3f66a..bef98c979 100644
--- a/cmd/cgr-engine/cgr-engine.go
+++ b/cmd/cgr-engine/cgr-engine.go
@@ -1692,10 +1692,12 @@ func main() {
// Start ServiceManager
srvManager := servmanager.NewServiceManager(cfg, dm, cacheS, cdrDb,
- loadDb, filterSChan, server, exitChan)
+ loadDb, filterSChan, server, internalDispatcherSChan, exitChan)
attrS := services.NewAttributeService()
- srvManager.AddService(attrS)
+ chrS := services.NewChargerService()
+ srvManager.AddService(attrS, chrS)
internalAttributeSChan = attrS.GetIntenternalChan()
+ internalChargerSChan = chrS.GetIntenternalChan()
go srvManager.StartServices()
initServiceManagerV1(internalServeManagerChan, srvManager, server)
@@ -1809,11 +1811,11 @@ func main() {
// go startAttributeService(internalAttributeSChan, cacheS,
// cfg, dm, server, filterSChan, exitChan)
// }
- if cfg.ChargerSCfg().Enabled {
- go startChargerService(internalChargerSChan, internalAttributeSChan,
- internalDispatcherSChan, cacheS, cfg, dm, server,
- filterSChan, exitChan)
- }
+ // if cfg.ChargerSCfg().Enabled {
+ // go startChargerService(internalChargerSChan, internalAttributeSChan,
+ // internalDispatcherSChan, cacheS, cfg, dm, server,
+ // filterSChan, exitChan)
+ // }
// Start RL service
if cfg.ResourceSCfg().Enabled {
diff --git a/config/config.go b/config/config.go
index c0102f532..d2cda7aef 100755
--- a/config/config.go
+++ b/config/config.go
@@ -1155,7 +1155,10 @@ func (cfg *CGRConfig) AttributeSCfg() *AttributeSCfg {
return cfg.attributeSCfg
}
+// ChargerSCfg returns the config for ChargerS
func (cfg *CGRConfig) ChargerSCfg() *ChargerSCfg {
+ cfg.lks[ChargerSCfgJson].Lock()
+ defer cfg.lks[ChargerSCfgJson].Unlock()
return cfg.chargerSCfg
}
diff --git a/config/config_it_test.go b/config/config_it_test.go
index 1718b3038..21de6aea2 100644
--- a/config/config_it_test.go
+++ b/config/config_it_test.go
@@ -81,7 +81,7 @@ func TestNewCGRConfigFromPath(t *testing.T) {
}
}
-func TestCGRConfigReload(t *testing.T) {
+func TestCGRConfigReloadAttributeS(t *testing.T) {
cfg, err := NewDefaultCGRConfig()
if err != nil {
t.Fatal(err)
@@ -107,6 +107,37 @@ func TestCGRConfigReload(t *testing.T) {
}
}
+func TestCGRConfigReloadChargerS(t *testing.T) {
+ cfg, err := NewDefaultCGRConfig()
+ if err != nil {
+ t.Fatal(err)
+ }
+ var reply string
+ if err = cfg.V1ReloadConfig(&ConfigReloadWithArgDispatcher{
+ Path: path.Join("/usr", "share", "cgrates", "conf", "samples", "tutmongo2"),
+ Section: ChargerSCfgJson,
+ }, &reply); err != nil {
+ t.Error(err)
+ } else if reply != utils.OK {
+ t.Errorf("Expected OK received: %s", reply)
+ }
+ expAttr := &ChargerSCfg{
+ Enabled: true,
+ StringIndexedFields: &[]string{utils.Account},
+ PrefixIndexedFields: &[]string{},
+ IndexedSelects: true,
+ AttributeSConns: []*RemoteHost{
+ &RemoteHost{
+ Address: "127.0.0.1:2012",
+ Transport: utils.MetaJSONrpc,
+ },
+ },
+ }
+ if !reflect.DeepEqual(expAttr, cfg.ChargerSCfg()) {
+ t.Errorf("Expected %s , received: %s ", utils.ToJSON(expAttr), utils.ToJSON(cfg.ChargerSCfg()))
+ }
+}
+
func TestCgrCfgV1ReloadConfigSection(t *testing.T) {
for _, dir := range []string{"/tmp/ers/in", "/tmp/ers/out"} {
if err := os.RemoveAll(dir); err != nil {
diff --git a/integration_test.sh b/integration_test.sh
index 10fc89d5e..21c042bbb 100755
--- a/integration_test.sh
+++ b/integration_test.sh
@@ -41,8 +41,11 @@ dis=$?
echo 'go test github.com/cgrates/cgrates/loaders -tags=integration'
go test github.com/cgrates/cgrates/loaders -tags=integration
lds=$?
+echo 'go test github.com/cgrates/cgrates/services -tags=integration'
+go test github.com/cgrates/cgrates/services -tags=integration
+srv=$?
echo 'go test github.com/cgrates/cgrates/apier/v1 -tags=offline'
go test github.com/cgrates/cgrates/apier/v1 -tags=offline
offline=$?
-exit $gen && $ap1 && $ap2 && $en && $cdrc && $cfg && $utl && $gnr && $agts && $smg && $mgr && $dis && $lds && $ers && offline
+exit $gen && $ap1 && $ap2 && $en && $cdrc && $cfg && $utl && $gnr && $agts && $smg && $mgr && $dis && $lds && $ers && $srv && $offline
diff --git a/services/attributes.go b/services/attributes.go
index 41b4191fe..ba0f45b29 100644
--- a/services/attributes.go
+++ b/services/attributes.go
@@ -28,6 +28,7 @@ import (
"github.com/cgrates/rpcclient"
)
+// NewAttributeService returns the Attribute Service
func NewAttributeService() servmanager.Service {
return &AttributeService{
connChan: make(chan rpcclient.RpcClientConnection, 1),
diff --git a/services/attributes_it_test.go b/services/attributes_it_test.go
index 3113c7517..ec104d84b 100644
--- a/services/attributes_it_test.go
+++ b/services/attributes_it_test.go
@@ -41,13 +41,15 @@ func TestAttributeSReload(t *testing.T) {
chS := engine.NewCacheS(cfg, nil)
close(chS.GetPrecacheChannel(utils.CacheAttributeProfiles))
close(chS.GetPrecacheChannel(utils.CacheAttributeFilterIndexes))
+ close(chS.GetPrecacheChannel(utils.CacheChargerProfiles))
+ close(chS.GetPrecacheChannel(utils.CacheChargerFilterIndexes))
server := utils.NewServer()
srvMngr := servmanager.NewServiceManager(cfg /*dm*/, nil,
chS /*cdrStorage*/, nil,
/*loadStorage*/ nil, filterSChan,
- server, engineShutdown)
+ server, nil, engineShutdown)
attrS := NewAttributeService()
- srvMngr.AddService(attrS)
+ srvMngr.AddService(attrS, NewChargerService())
if err = srvMngr.StartServices(); err != nil {
t.Error(err)
}
@@ -55,17 +57,21 @@ func TestAttributeSReload(t *testing.T) {
t.Errorf("Expected service to be down")
}
var reply string
- cfg.V1ReloadConfig(&config.ConfigReloadWithArgDispatcher{
+ if err := cfg.V1ReloadConfig(&config.ConfigReloadWithArgDispatcher{
Path: path.Join("/usr", "share", "cgrates", "conf", "samples", "tutmongo"),
Section: config.ATTRIBUTE_JSN,
- }, &reply)
- time.Sleep(1) //need to switch to gorutine
+ }, &reply); err != nil {
+ t.Error(err)
+ } else if reply != utils.OK {
+ t.Errorf("Expecting OK ,received %s", reply)
+ }
+ time.Sleep(10 * time.Millisecond) //need to switch to gorutine
if !attrS.IsRunning() {
t.Errorf("Expected service to be running")
}
cfg.AttributeSCfg().Enabled = false
cfg.GetReloadChan(config.ATTRIBUTE_JSN) <- struct{}{}
- time.Sleep(1)
+ time.Sleep(10 * time.Millisecond)
if attrS.IsRunning() {
t.Errorf("Expected service to be down")
}
diff --git a/services/chargers.go b/services/chargers.go
new file mode 100644
index 000000000..b44dad7d8
--- /dev/null
+++ b/services/chargers.go
@@ -0,0 +1,111 @@
+/*
+Real-time Online/Offline Charging System (OCS) for Telecom & ISP environments
+Copyright (C) ITsysCOM GmbH
+
+This program is free software: you can redistribute it and/or modify
+it under the terms of the GNU General Public License as published by
+the Free Software Foundation, either version 3 of the License, or
+(at your option) any later version.
+
+This program is distributed in the hope that it will be useful,
+but WITHOUT ANY WARRANTY; without even the implied warranty of
+MERCHANTABILITY or FITNESS FOR A PARTICULAR PURPOSE. See the
+GNU General Public License for more details.
+
+You should have received a copy of the GNU General Public License
+along with this program. If not, see
+*/
+
+package services
+
+import (
+ "fmt"
+
+ v1 "github.com/cgrates/cgrates/apier/v1"
+ "github.com/cgrates/cgrates/engine"
+ "github.com/cgrates/cgrates/servmanager"
+ "github.com/cgrates/cgrates/utils"
+ "github.com/cgrates/rpcclient"
+)
+
+// NewChargerService returns the Charger Service
+func NewChargerService() servmanager.Service {
+ return &ChargerService{
+ connChan: make(chan rpcclient.RpcClientConnection, 1),
+ }
+}
+
+// ChargerService implements Service interface
+type ChargerService struct {
+ chrS *engine.ChargerService
+ rpc *v1.ChargerSv1
+ connChan chan rpcclient.RpcClientConnection
+}
+
+// Start should handle the sercive start
+func (chrS *ChargerService) Start(sp servmanager.ServiceProvider, waitCache bool) (err error) {
+ if chrS.IsRunning() {
+ return fmt.Errorf("service aleady running")
+ }
+ if waitCache {
+ <-sp.GetCacheS().GetPrecacheChannel(utils.CacheChargerProfiles)
+ <-sp.GetCacheS().GetPrecacheChannel(utils.CacheChargerFilterIndexes)
+ }
+ var chrSConn rpcclient.RpcClientConnection
+ if chrSConn, err = sp.GetConnection(utils.AttributeS, sp.GetConfig().ChargerSCfg().AttributeSConns); err != nil {
+ utils.Logger.Crit(fmt.Sprintf("<%s> Could not connect to %s: %s",
+ utils.ChargerS, utils.AttributeS, err.Error()))
+ return
+ }
+
+ if chrS.chrS, err = engine.NewChargerService(sp.GetDM(), sp.GetFilterS(), chrSConn, sp.GetConfig()); err != nil {
+ utils.Logger.Crit(
+ fmt.Sprintf("<%s> Could not init, error: %s",
+ utils.ChargerS, err.Error()))
+ return
+ }
+ utils.Logger.Info(fmt.Sprintf("<%s> starting <%s> subsystem", utils.CoreS, utils.ChargerS))
+ cSv1 := v1.NewChargerSv1(chrS.chrS)
+ if !sp.GetConfig().DispatcherSCfg().Enabled {
+ sp.GetServer().RpcRegister(cSv1)
+ }
+ chrS.connChan <- cSv1
+ return
+}
+
+// GetIntenternalChan returns the internal connection chanel
+func (chrS *ChargerService) GetIntenternalChan() (conn chan rpcclient.RpcClientConnection) {
+ return chrS.connChan
+}
+
+// Reload handles the change of config
+func (chrS *ChargerService) Reload(sp servmanager.ServiceProvider) (err error) {
+ // need to reload the connection to the attributeS
+ return
+}
+
+// Shutdown stops the service
+func (chrS *ChargerService) Shutdown() (err error) {
+ if err = chrS.chrS.Shutdown(); err != nil {
+ return
+ }
+ chrS.chrS = nil
+ chrS.rpc = nil
+ <-chrS.connChan
+ return
+}
+
+// GetRPCInterface returns the interface to register for server
+func (chrS *ChargerService) GetRPCInterface() interface{} {
+ return chrS.rpc
+}
+
+// IsRunning returns if the service is running
+func (chrS *ChargerService) IsRunning() bool {
+ return chrS != nil && chrS.chrS != nil
+}
+
+// ServiceName returns the service name
+func (chrS *ChargerService) ServiceName() string {
+ return utils.ChargerS
+}
diff --git a/services/chargers_it_test.go b/services/chargers_it_test.go
new file mode 100644
index 000000000..8f902f757
--- /dev/null
+++ b/services/chargers_it_test.go
@@ -0,0 +1,81 @@
+// +build integration
+
+/*
+Real-time Online/Offline Charging System (OCS) for Telecom & ISP environments
+Copyright (C) ITsysCOM GmbH
+
+This program is free software: you can redistribute it and/or modify
+it under the terms of the GNU General Public License as published by
+the Free Software Foundation, either version 3 of the License, or
+(at your option) any later version.
+
+This program is distributed in the hope that it will be useful,
+but WITHOUT ANY WARRANTY; without even the implied warranty of
+MERCHANTABILITY or FITNESS FOR A PARTICULAR PURPOSE. See the
+GNU General Public License for more details.
+
+You should have received a copy of the GNU General Public License
+along with this program. If not, see
+*/
+package services
+
+import (
+ "path"
+ "testing"
+ "time"
+
+ "github.com/cgrates/cgrates/config"
+ "github.com/cgrates/cgrates/engine"
+ "github.com/cgrates/cgrates/servmanager"
+ "github.com/cgrates/cgrates/utils"
+)
+
+func TestChargerSReload(t *testing.T) {
+ // utils.Logger.SetLogLevel(7)
+ cfg, err := config.NewDefaultCGRConfig()
+ if err != nil {
+ t.Fatal(err)
+ }
+ cfg.AttributeSCfg().Enabled = true
+ filterSChan := make(chan *engine.FilterS, 1)
+ filterSChan <- nil
+ engineShutdown := make(chan bool, 1)
+ chS := engine.NewCacheS(cfg, nil)
+ close(chS.GetPrecacheChannel(utils.CacheAttributeProfiles))
+ close(chS.GetPrecacheChannel(utils.CacheAttributeFilterIndexes))
+ close(chS.GetPrecacheChannel(utils.CacheChargerProfiles))
+ close(chS.GetPrecacheChannel(utils.CacheChargerFilterIndexes))
+ server := utils.NewServer()
+ srvMngr := servmanager.NewServiceManager(cfg /*dm*/, nil,
+ chS /*cdrStorage*/, nil,
+ /*loadStorage*/ nil, filterSChan,
+ server, nil, engineShutdown)
+ chrS := NewChargerService()
+ srvMngr.AddService(NewAttributeService(), chrS)
+ if err = srvMngr.StartServices(); err != nil {
+ t.Error(err)
+ }
+ if chrS.IsRunning() {
+ t.Errorf("Expected service to be down")
+ }
+ var reply string
+ if err = cfg.V1ReloadConfig(&config.ConfigReloadWithArgDispatcher{
+ Path: path.Join("/usr", "share", "cgrates", "conf", "samples", "tutmongo"),
+ Section: config.ChargerSCfgJson,
+ }, &reply); err != nil {
+ t.Error(err)
+ } else if reply != utils.OK {
+ t.Errorf("Expecting OK ,received %s", reply)
+ }
+ time.Sleep(10 * time.Millisecond) //need to switch to gorutine
+ if !chrS.IsRunning() {
+ t.Errorf("Expected service to be running")
+ }
+ cfg.ChargerSCfg().Enabled = false
+ cfg.GetReloadChan(config.ChargerSCfgJson) <- struct{}{}
+ time.Sleep(10 * time.Millisecond)
+ if chrS.IsRunning() {
+ t.Errorf("Expected service to be down")
+ }
+ engineShutdown <- true
+}
diff --git a/servmanager/servmanager.go b/servmanager/servmanager.go
index 0cab5267b..093def4ce 100644
--- a/servmanager/servmanager.go
+++ b/servmanager/servmanager.go
@@ -36,18 +36,20 @@ import (
func NewServiceManager(cfg *config.CGRConfig, dm *engine.DataManager,
cacheS *engine.CacheS, cdrStorage engine.CdrStorage,
loadStorage engine.LoadStorage, filterSChan chan *engine.FilterS,
- server *utils.Server, engineShutdown chan bool) *ServiceManager {
+ server *utils.Server, dispatcherSChan chan rpcclient.RpcClientConnection,
+ engineShutdown chan bool) *ServiceManager {
sm := &ServiceManager{
cfg: cfg,
dm: dm,
engineShutdown: engineShutdown,
cacheS: cacheS,
- cdrStorage: cdrStorage,
- loadStorage: loadStorage,
- filterS: filterSChan,
- server: server,
- subsystems: make(map[string]Service),
+ cdrStorage: cdrStorage,
+ loadStorage: loadStorage,
+ filterS: filterSChan,
+ server: server,
+ subsystems: make(map[string]Service),
+ dispatcherSChan: dispatcherSChan,
}
return sm
}
@@ -66,6 +68,8 @@ type ServiceManager struct {
filterS chan *engine.FilterS
server *utils.Server
subsystems map[string]Service
+
+ dispatcherSChan chan rpcclient.RpcClientConnection
}
func (srvMngr *ServiceManager) StartScheduler(waitCache bool) error {
@@ -262,12 +266,19 @@ func (srvMngr *ServiceManager) GetExitChan() chan bool {
// GetConnection creates a rpcClient to the specified subsystem
func (srvMngr *ServiceManager) GetConnection(subsystem string, conns []*config.RemoteHost) (rpcclient.RpcClientConnection, error) {
+ if len(conns) == 0 {
+ return nil, nil
+ }
+ internalChan := srvMngr.subsystems[subsystem].GetIntenternalChan()
+ if srvMngr.GetConfig().DispatcherSCfg().Enabled {
+ internalChan = srvMngr.dispatcherSChan
+ }
return engine.NewRPCPool(rpcclient.POOL_FIRST,
srvMngr.cfg.TlsCfg().ClientKey,
srvMngr.cfg.TlsCfg().ClientCerificate, srvMngr.cfg.TlsCfg().CaCertificate,
srvMngr.cfg.GeneralCfg().ConnectAttempts, srvMngr.cfg.GeneralCfg().Reconnects,
srvMngr.cfg.GeneralCfg().ConnectTimeout, srvMngr.cfg.GeneralCfg().ReplyTimeout,
- conns, srvMngr.subsystems[subsystem].GetIntenternalChan(), false)
+ conns, internalChan, false)
}
// StartServices starts all enabled services
@@ -284,6 +295,17 @@ func (srvMngr *ServiceManager) StartServices() (err error) {
}
}()
}
+ if srvMngr.cfg.ChargerSCfg().Enabled {
+ go func() {
+ if chrS, has := srvMngr.subsystems[utils.ChargerS]; !has {
+ utils.Logger.Err(fmt.Sprintf("<%s> Failed to start <%s>", utils.ServiceManager, utils.ChargerS))
+ srvMngr.engineShutdown <- true
+ } else if err = chrS.Start(srvMngr, true); err != nil {
+ utils.Logger.Err(fmt.Sprintf("<%s> Failed to start %s because: %s", utils.ServiceManager, utils.ChargerS, err))
+ srvMngr.engineShutdown <- true
+ }
+ }()
+ }
// startServer()
return
@@ -334,6 +356,34 @@ func (srvMngr *ServiceManager) handleReload() {
return // stop if we encounter an error
}
}
+ case <-srvMngr.cfg.GetReloadChan(config.ChargerSCfgJson):
+ chrS, has := srvMngr.subsystems[utils.ChargerS]
+ if !has {
+ utils.Logger.Err(fmt.Sprintf("<%s> Failed to start <%s>", utils.ServiceManager, utils.ChargerS))
+ srvMngr.engineShutdown <- true
+ return // stop if we encounter an error
+ }
+ if srvMngr.cfg.ChargerSCfg().Enabled {
+ if chrS.IsRunning() {
+ if err = chrS.Reload(srvMngr); err != nil {
+ utils.Logger.Err(fmt.Sprintf("<%s> Failed to reload <%s>", utils.ServiceManager, utils.ChargerS))
+ srvMngr.engineShutdown <- true
+ return // stop if we encounter an error
+ }
+ } else {
+ if err = chrS.Start(srvMngr, true); err != nil {
+ utils.Logger.Err(fmt.Sprintf("<%s> Failed to start <%s>", utils.ServiceManager, utils.ChargerS))
+ srvMngr.engineShutdown <- true
+ return // stop if we encounter an error
+ }
+ }
+ } else if chrS.IsRunning() {
+ if err = chrS.Shutdown(); err != nil {
+ utils.Logger.Err(fmt.Sprintf("<%s> Failed to stop service <%s>", utils.ServiceManager, utils.ChargerS))
+ srvMngr.engineShutdown <- true
+ return // stop if we encounter an error
+ }
+ }
}
// handle RPC server
}