mirror of
https://github.com/cgrates/cgrates.git
synced 2026-02-12 02:26:26 +05:00
Implement start/stop/status methods for servmanager + tests
This commit is contained in:
committed by
Dan Christian Bogos
parent
ad7bd111d1
commit
60a4da69e0
45
apis/servmanager.go
Normal file
45
apis/servmanager.go
Normal file
@@ -0,0 +1,45 @@
|
||||
/*
|
||||
Real-time Online/Offline Charging System (OCS) for Telecom & ISP environments
|
||||
Copyright (C) ITsysCOM GmbH
|
||||
|
||||
This program is free software: you can redistribute it and/or modify
|
||||
it under the terms of the GNU General Public License as published by
|
||||
the Free Software Foundation, either version 3 of the License, or
|
||||
(at your option) any later version.
|
||||
|
||||
This program is distributed in the hope that it will be useful,
|
||||
but WITHOUT ANY WARRANTY; without even the implied warranty of
|
||||
MERCHANTABILITY or FITNESS FOR A PARTICULAR PURPOSE. See the
|
||||
GNU General Public License for more details.
|
||||
|
||||
You should have received a copy of the GNU General Public License
|
||||
along with this program. If not, see <http://www.gnu.org/licenses/>
|
||||
*/
|
||||
|
||||
package apis
|
||||
|
||||
import (
|
||||
"github.com/cgrates/birpc/context"
|
||||
"github.com/cgrates/cgrates/servmanager"
|
||||
)
|
||||
|
||||
func NewServiceManagerV1(sm *servmanager.ServiceManager) *ServiceManagerV1 {
|
||||
return &ServiceManagerV1{sm: sm}
|
||||
}
|
||||
|
||||
type ServiceManagerV1 struct {
|
||||
sm *servmanager.ServiceManager
|
||||
ping
|
||||
}
|
||||
|
||||
func (servManager *ServiceManagerV1) StartService(ctx *context.Context, args *servmanager.ArgsServiceID, reply *string) (err error) {
|
||||
return servManager.sm.V1StartService(ctx, args, reply)
|
||||
}
|
||||
|
||||
func (servManager *ServiceManagerV1) StopService(ctx *context.Context, args *servmanager.ArgsServiceID, reply *string) (err error) {
|
||||
return servManager.sm.V1StopService(ctx, args, reply)
|
||||
}
|
||||
|
||||
func (servManager *ServiceManagerV1) ServiceStatus(ctx *context.Context, args *servmanager.ArgsServiceID, reply *string) (err error) {
|
||||
return servManager.sm.V1ServiceStatus(ctx, args, reply)
|
||||
}
|
||||
212
apis/servmanager_it_test.go
Normal file
212
apis/servmanager_it_test.go
Normal file
@@ -0,0 +1,212 @@
|
||||
//go:build integration
|
||||
// +build integration
|
||||
|
||||
/*
|
||||
Real-time Online/Offline Charging System (OCS) for Telecom & ISP environments
|
||||
Copyright (C) ITsysCOM GmbH
|
||||
|
||||
This program is free software: you can redistribute it and/or modify
|
||||
it under the terms of the GNU General Public License as published by
|
||||
the Free Software Foundation, either version 3 of the License, or
|
||||
(at your option) any later version.
|
||||
|
||||
This program is distributed in the hope that it will be useful,
|
||||
but WITHOUT ANY WARRANTY; without even the implied warranty of
|
||||
MERCHANTABILITY or FITNESS FOR A PARTICULAR PURPOSE. See the
|
||||
GNU General Public License for more details.
|
||||
|
||||
You should have received a copy of the GNU General Public License
|
||||
along with this program. If not, see <http://www.gnu.org/licenses/>
|
||||
*/
|
||||
|
||||
package apis
|
||||
|
||||
import (
|
||||
"fmt"
|
||||
"path"
|
||||
"testing"
|
||||
|
||||
"github.com/cgrates/birpc"
|
||||
"github.com/cgrates/birpc/context"
|
||||
"github.com/cgrates/cgrates/config"
|
||||
"github.com/cgrates/cgrates/engine"
|
||||
"github.com/cgrates/cgrates/servmanager"
|
||||
"github.com/cgrates/cgrates/utils"
|
||||
)
|
||||
|
||||
var (
|
||||
srvMngCfgPath string
|
||||
srvMngCfg *config.CGRConfig
|
||||
srvMngRPC *birpc.Client
|
||||
srvMngConfigDIR string //run tests for specific configuration
|
||||
|
||||
sTestsServManager = []func(t *testing.T){
|
||||
testSrvMngInitCfg,
|
||||
testSrvMngInitDataDb,
|
||||
testSrvMngStartEngine,
|
||||
testSrvMngSRPCConn,
|
||||
|
||||
testSrvMngPing,
|
||||
|
||||
testSrvMngSKillEngine,
|
||||
}
|
||||
)
|
||||
|
||||
func TestServManagerIT(t *testing.T) {
|
||||
switch *dbType {
|
||||
case utils.MetaInternal:
|
||||
srvMngConfigDIR = "apis_srvmng_internal"
|
||||
case utils.MetaMongo:
|
||||
srvMngConfigDIR = "apis_srvmng_mongo"
|
||||
case utils.MetaMySQL:
|
||||
srvMngConfigDIR = "apis_srvmng_mysql"
|
||||
case utils.MetaPostgres:
|
||||
t.SkipNow()
|
||||
default:
|
||||
t.Fatal("Unknown Database type")
|
||||
}
|
||||
for _, stest := range sTestsServManager {
|
||||
t.Run(srvMngConfigDIR, stest)
|
||||
}
|
||||
}
|
||||
|
||||
func testSrvMngInitCfg(t *testing.T) {
|
||||
var err error
|
||||
srvMngCfgPath = path.Join(*dataDir, "conf", "samples", srvMngConfigDIR)
|
||||
srvMngCfg, err = config.NewCGRConfigFromPath(context.Background(), srvMngCfgPath)
|
||||
if err != nil {
|
||||
t.Error(err)
|
||||
}
|
||||
}
|
||||
|
||||
func testSrvMngInitDataDb(t *testing.T) {
|
||||
if err := engine.InitDataDB(srvMngCfg); err != nil {
|
||||
t.Fatal(err)
|
||||
}
|
||||
}
|
||||
|
||||
// Start CGR Engine
|
||||
func testSrvMngStartEngine(t *testing.T) {
|
||||
if _, err := engine.StopStartEngine(srvMngCfgPath, *waitRater); err != nil {
|
||||
t.Fatal(err)
|
||||
}
|
||||
}
|
||||
|
||||
func testSrvMngSRPCConn(t *testing.T) {
|
||||
var err error
|
||||
srvMngRPC, err = newRPCClient(srvMngCfg.ListenCfg()) // We connect over JSON so we can also troubleshoot if needed
|
||||
if err != nil {
|
||||
t.Fatal(err)
|
||||
}
|
||||
}
|
||||
|
||||
// Kill the engine when it is about to be finished
|
||||
func testSrvMngSKillEngine(t *testing.T) {
|
||||
if err := engine.KillEngine(100); err != nil {
|
||||
t.Error(err)
|
||||
}
|
||||
}
|
||||
|
||||
func testSrvMngPing(t *testing.T) {
|
||||
var reply string
|
||||
if err := srvMngRPC.Call(context.Background(), utils.ServiceManagerV1Ping, nil,
|
||||
&reply); err != nil {
|
||||
t.Error(err)
|
||||
} else if reply != utils.Pong {
|
||||
t.Errorf("Unexpected reply: %s", reply)
|
||||
}
|
||||
|
||||
serviceToMethod := map[string]string{
|
||||
utils.AdminS: utils.AdminSv1Ping,
|
||||
utils.AccountS: utils.AccountSv1Ping,
|
||||
utils.ActionS: utils.ActionSv1Ping,
|
||||
utils.AnalyzerS: utils.AnalyzerSv1Ping,
|
||||
utils.AttributeS: utils.AttributeSv1Ping,
|
||||
utils.CDRServer: utils.CDRsV1Ping,
|
||||
utils.ChargerS: utils.ChargerSv1Ping,
|
||||
// utils.DispatcherS: utils.DispatcherSv1Ping,
|
||||
utils.EEs: utils.EeSv1Ping,
|
||||
utils.EFs: utils.EfSv1Ping,
|
||||
utils.RateS: utils.RateSv1Ping,
|
||||
utils.ResourceS: utils.ResourceSv1Ping,
|
||||
utils.RouteS: utils.RouteSv1Ping,
|
||||
utils.SessionS: utils.SessionSv1Ping,
|
||||
utils.StatS: utils.StatSv1Ping,
|
||||
utils.ThresholdS: utils.ThresholdSv1Ping,
|
||||
utils.TPeS: utils.TPeSv1Ping,
|
||||
}
|
||||
|
||||
/*
|
||||
Run the following tests for each service:
|
||||
|
||||
- ping before enabling service (expect can't find service error)
|
||||
- query for service status (expect "STOPPED" reply)
|
||||
- start the service
|
||||
- query for service status (expect "RUNNING" reply)
|
||||
- ping (expect "Pong")
|
||||
- stop service
|
||||
- query for service status (expect "STOPPED" reply)
|
||||
- ping after stopping service (expect "can't find service" error)
|
||||
*/
|
||||
|
||||
for serviceID, serviceMethod := range serviceToMethod {
|
||||
t.Run(fmt.Sprintf("test for service %s", serviceID), func(t *testing.T) {
|
||||
rpcErr := fmt.Sprintf("rpc: can't find service %s", serviceMethod)
|
||||
if err := srvMngRPC.Call(context.Background(), serviceMethod, nil,
|
||||
&reply); err == nil || err.Error() != rpcErr {
|
||||
t.Errorf("expected: <%+v>,\nreceived: <%+v>", rpcErr, err)
|
||||
}
|
||||
|
||||
args := servmanager.ArgsServiceID{
|
||||
ServiceID: serviceID,
|
||||
}
|
||||
|
||||
if err := srvMngRPC.Call(context.Background(), utils.ServiceManagerV1ServiceStatus, args,
|
||||
&reply); err != nil {
|
||||
t.Error(err)
|
||||
} else if reply != utils.StoppedCaps {
|
||||
t.Errorf("Unexpected reply: %s", reply)
|
||||
}
|
||||
|
||||
if err := srvMngRPC.Call(context.Background(), utils.ServiceManagerV1StartService, args,
|
||||
&reply); err != nil {
|
||||
t.Error(err)
|
||||
} else if reply != utils.OK {
|
||||
t.Errorf("Unexpected reply: %s", reply)
|
||||
}
|
||||
|
||||
if err := srvMngRPC.Call(context.Background(), utils.ServiceManagerV1ServiceStatus, args,
|
||||
&reply); err != nil {
|
||||
t.Error(err)
|
||||
} else if reply != utils.RunningCaps {
|
||||
t.Errorf("Unexpected reply: %s", reply)
|
||||
}
|
||||
|
||||
if err := srvMngRPC.Call(context.Background(), serviceMethod, nil,
|
||||
&reply); err != nil {
|
||||
t.Error(err)
|
||||
} else if reply != utils.Pong {
|
||||
t.Errorf("Unexpected reply: %s", reply)
|
||||
}
|
||||
|
||||
if err := srvMngRPC.Call(context.Background(), utils.ServiceManagerV1StopService, args,
|
||||
&reply); err != nil {
|
||||
t.Error(err)
|
||||
} else if reply != utils.OK {
|
||||
t.Errorf("Unexpected reply: %s", reply)
|
||||
}
|
||||
|
||||
if err := srvMngRPC.Call(context.Background(), utils.ServiceManagerV1ServiceStatus, args,
|
||||
&reply); err != nil {
|
||||
t.Error(err)
|
||||
} else if reply != utils.StoppedCaps {
|
||||
t.Errorf("Unexpected reply: %s", reply)
|
||||
}
|
||||
|
||||
if err := srvMngRPC.Call(context.Background(), serviceMethod, nil,
|
||||
&reply); err == nil || err.Error() != rpcErr {
|
||||
t.Errorf("expected: <%+v>,\nreceived: <%+v>", rpcErr, err)
|
||||
}
|
||||
})
|
||||
}
|
||||
}
|
||||
18
data/conf/samples/apis_srvmng_internal/cgrates.json
Normal file
18
data/conf/samples/apis_srvmng_internal/cgrates.json
Normal file
@@ -0,0 +1,18 @@
|
||||
{
|
||||
|
||||
"general": {
|
||||
"log_level": 7,
|
||||
"reply_timeout": "50s"
|
||||
},
|
||||
|
||||
"listen": {
|
||||
"rpc_json": ":2012",
|
||||
"rpc_gob": ":2013",
|
||||
"http": ":2080"
|
||||
},
|
||||
|
||||
"data_db": {
|
||||
"db_type": "*internal"
|
||||
}
|
||||
|
||||
}
|
||||
20
data/conf/samples/apis_srvmng_mongo/cgrates.json
Normal file
20
data/conf/samples/apis_srvmng_mongo/cgrates.json
Normal file
@@ -0,0 +1,20 @@
|
||||
{
|
||||
|
||||
"general": {
|
||||
"log_level": 7,
|
||||
"reply_timeout": "50s"
|
||||
},
|
||||
|
||||
"listen": {
|
||||
"rpc_json": ":2012",
|
||||
"rpc_gob": ":2013",
|
||||
"http": ":2080"
|
||||
},
|
||||
|
||||
"data_db": {
|
||||
"db_type": "mongo",
|
||||
"db_name": "10",
|
||||
"db_port": 27017
|
||||
}
|
||||
|
||||
}
|
||||
20
data/conf/samples/apis_srvmng_mysql/cgrates.json
Normal file
20
data/conf/samples/apis_srvmng_mysql/cgrates.json
Normal file
@@ -0,0 +1,20 @@
|
||||
{
|
||||
|
||||
"general": {
|
||||
"log_level": 7,
|
||||
"reply_timeout": "50s"
|
||||
},
|
||||
|
||||
"listen": {
|
||||
"rpc_json": ":2012",
|
||||
"rpc_gob": ":2013",
|
||||
"http": ":2080"
|
||||
},
|
||||
|
||||
"data_db": {
|
||||
"db_type": "redis",
|
||||
"db_port": 6379,
|
||||
"db_name": "10"
|
||||
}
|
||||
|
||||
}
|
||||
@@ -48,7 +48,7 @@ func TestAccountSReload(t *testing.T) {
|
||||
chSCh <- chS
|
||||
css := &CacheService{cacheCh: chSCh}
|
||||
server := cores.NewServer(nil)
|
||||
srvMngr := servmanager.NewServiceManager(shdWg, nil, cfg.GetReloadChan())
|
||||
srvMngr := servmanager.NewServiceManager(shdWg, nil, cfg)
|
||||
srvDep := map[string]*sync.WaitGroup{utils.DataDB: new(sync.WaitGroup)}
|
||||
db := NewDataDBService(cfg, nil, srvDep)
|
||||
acctRPC := make(chan birpc.ClientConnector, 1)
|
||||
|
||||
@@ -48,7 +48,7 @@ func TestActionSReload(t *testing.T) {
|
||||
chSCh <- chS
|
||||
css := &CacheService{cacheCh: chSCh}
|
||||
server := cores.NewServer(nil)
|
||||
srvMngr := servmanager.NewServiceManager(shdWg, nil, cfg.GetReloadChan())
|
||||
srvMngr := servmanager.NewServiceManager(shdWg, nil, cfg)
|
||||
srvDep := map[string]*sync.WaitGroup{utils.DataDB: new(sync.WaitGroup)}
|
||||
db := NewDataDBService(cfg, nil, srvDep)
|
||||
actRPC := make(chan birpc.ClientConnector, 1)
|
||||
|
||||
@@ -47,7 +47,7 @@ func TestAnalyzerSReload(t *testing.T) {
|
||||
filterSChan := make(chan *engine.FilterS, 1)
|
||||
filterSChan <- nil
|
||||
server := cores.NewServer(nil)
|
||||
srvMngr := servmanager.NewServiceManager(shdWg, nil, cfg.GetReloadChan())
|
||||
srvMngr := servmanager.NewServiceManager(shdWg, nil, cfg)
|
||||
srvDep := map[string]*sync.WaitGroup{utils.DataDB: new(sync.WaitGroup)}
|
||||
db := NewDataDBService(cfg, nil, srvDep)
|
||||
anzRPC := make(chan birpc.ClientConnector, 1)
|
||||
|
||||
@@ -49,7 +49,7 @@ func TestAsteriskAgentReload(t *testing.T) {
|
||||
shdWg := new(sync.WaitGroup)
|
||||
|
||||
server := cores.NewServer(nil)
|
||||
srvMngr := servmanager.NewServiceManager(shdWg, nil, cfg.GetReloadChan())
|
||||
srvMngr := servmanager.NewServiceManager(shdWg, nil, cfg)
|
||||
srvDep := map[string]*sync.WaitGroup{utils.DataDB: new(sync.WaitGroup)}
|
||||
db := NewDataDBService(cfg, nil, srvDep)
|
||||
anz := NewAnalyzerService(cfg, server, filterSChan, make(chan birpc.ClientConnector, 1), srvDep)
|
||||
@@ -107,7 +107,7 @@ func TestAsteriskAgentReload2(t *testing.T) {
|
||||
shdWg := new(sync.WaitGroup)
|
||||
|
||||
server := cores.NewServer(nil)
|
||||
srvMngr := servmanager.NewServiceManager(shdWg, nil, cfg.GetReloadChan())
|
||||
srvMngr := servmanager.NewServiceManager(shdWg, nil, cfg)
|
||||
srvDep := map[string]*sync.WaitGroup{utils.DataDB: new(sync.WaitGroup)}
|
||||
db := NewDataDBService(cfg, nil, srvDep)
|
||||
anz := NewAnalyzerService(cfg, server, filterSChan, make(chan birpc.ClientConnector, 1), srvDep)
|
||||
|
||||
@@ -48,7 +48,7 @@ func TestAttributeSReload(t *testing.T) {
|
||||
chSCh <- chS
|
||||
css := &CacheService{cacheCh: chSCh}
|
||||
server := cores.NewServer(nil)
|
||||
srvMngr := servmanager.NewServiceManager(shdWg, nil, cfg.GetReloadChan())
|
||||
srvMngr := servmanager.NewServiceManager(shdWg, nil, cfg)
|
||||
srvDep := map[string]*sync.WaitGroup{utils.DataDB: new(sync.WaitGroup)}
|
||||
db := NewDataDBService(cfg, nil, srvDep)
|
||||
attrRPC := make(chan birpc.ClientConnector, 1)
|
||||
|
||||
@@ -51,7 +51,7 @@ func TestCdrsReload(t *testing.T) {
|
||||
|
||||
cfg.ChargerSCfg().Enabled = true
|
||||
server := cores.NewServer(nil)
|
||||
srvMngr := servmanager.NewServiceManager(shdWg, nil, cfg.GetReloadChan())
|
||||
srvMngr := servmanager.NewServiceManager(shdWg, nil, cfg)
|
||||
srvDep := map[string]*sync.WaitGroup{utils.DataDB: new(sync.WaitGroup)}
|
||||
db := NewDataDBService(cfg, nil, srvDep)
|
||||
anz := NewAnalyzerService(cfg, server, filterSChan, make(chan birpc.ClientConnector, 1), srvDep)
|
||||
|
||||
@@ -47,7 +47,7 @@ func NewCGREngine(cfg *config.CGRConfig, cM *engine.ConnManager, shdWg *sync.Wai
|
||||
cM: cM, // connection manager
|
||||
caps: caps, // caps is used to limit RPC CPS
|
||||
shdWg: shdWg, // wait for shutdown
|
||||
srvManager: servmanager.NewServiceManager(shdWg, cM, cfg.GetReloadChan()),
|
||||
srvManager: servmanager.NewServiceManager(shdWg, cM, cfg),
|
||||
server: server, // Rpc/http server
|
||||
srvDep: map[string]*sync.WaitGroup{
|
||||
utils.AnalyzerS: new(sync.WaitGroup),
|
||||
|
||||
@@ -53,7 +53,7 @@ func TestChargerSReload(t *testing.T) {
|
||||
filterSChan <- nil
|
||||
srvDep := map[string]*sync.WaitGroup{utils.DataDB: new(sync.WaitGroup)}
|
||||
server := cores.NewServer(nil)
|
||||
srvMngr := servmanager.NewServiceManager(shdWg, nil, cfg.GetReloadChan())
|
||||
srvMngr := servmanager.NewServiceManager(shdWg, nil, cfg)
|
||||
db := NewDataDBService(cfg, nil, srvDep)
|
||||
anz := NewAnalyzerService(cfg, server, filterSChan, make(chan birpc.ClientConnector, 1), srvDep)
|
||||
attrS := NewAttributeService(cfg, db, css, filterSChan, server, make(chan birpc.ClientConnector, 1), anz, &DispatcherService{srvsReload: make(map[string]chan struct{})}, srvDep)
|
||||
|
||||
@@ -42,7 +42,7 @@ func TestCoreSReload(t *testing.T) {
|
||||
filterSChan := make(chan *engine.FilterS, 1)
|
||||
filterSChan <- nil
|
||||
server := cores.NewServer(nil)
|
||||
srvMngr := servmanager.NewServiceManager(shdWg, nil, cfg.GetReloadChan())
|
||||
srvMngr := servmanager.NewServiceManager(shdWg, nil, cfg)
|
||||
srvDep := map[string]*sync.WaitGroup{utils.DataDB: new(sync.WaitGroup)}
|
||||
db := NewDataDBService(cfg, nil, srvDep)
|
||||
coreRPC := make(chan birpc.ClientConnector, 1)
|
||||
|
||||
@@ -50,7 +50,7 @@ func TestDataDBReload(t *testing.T) {
|
||||
css := &CacheService{cacheCh: chSCh}
|
||||
server := cores.NewServer(nil)
|
||||
srvDep := map[string]*sync.WaitGroup{utils.DataDB: new(sync.WaitGroup)}
|
||||
srvMngr := servmanager.NewServiceManager(shdWg, nil, cfg.GetReloadChan())
|
||||
srvMngr := servmanager.NewServiceManager(shdWg, nil, cfg)
|
||||
cM := engine.NewConnManager(cfg)
|
||||
db := NewDataDBService(cfg, cM, srvDep)
|
||||
anz := NewAnalyzerService(cfg, server, filterSChan, make(chan birpc.ClientConnector, 1), srvDep)
|
||||
|
||||
@@ -44,7 +44,7 @@ func TestDiameterAgentReload1(t *testing.T) {
|
||||
shdWg := new(sync.WaitGroup)
|
||||
server := cores.NewServer(nil)
|
||||
srvDep := map[string]*sync.WaitGroup{utils.DataDB: new(sync.WaitGroup)}
|
||||
srvMngr := servmanager.NewServiceManager(shdWg, nil, cfg.GetReloadChan())
|
||||
srvMngr := servmanager.NewServiceManager(shdWg, nil, cfg)
|
||||
db := NewDataDBService(cfg, nil, srvDep)
|
||||
anz := NewAnalyzerService(cfg, server, filterSChan, make(chan birpc.ClientConnector, 1), srvDep)
|
||||
sS := NewSessionService(cfg, db, filterSChan, server, make(chan birpc.ClientConnector, 1),
|
||||
|
||||
@@ -53,7 +53,7 @@ func TestDispatcherSReload(t *testing.T) {
|
||||
filterSChan := make(chan *engine.FilterS, 1)
|
||||
filterSChan <- nil
|
||||
server := cores.NewServer(nil)
|
||||
srvMngr := servmanager.NewServiceManager(shdWg, nil, cfg.GetReloadChan())
|
||||
srvMngr := servmanager.NewServiceManager(shdWg, nil, cfg)
|
||||
srvDep := map[string]*sync.WaitGroup{utils.DataDB: new(sync.WaitGroup)}
|
||||
db := NewDataDBService(cfg, nil, srvDep)
|
||||
anz := NewAnalyzerService(cfg, server, filterSChan, make(chan birpc.ClientConnector, 1), srvDep)
|
||||
|
||||
@@ -51,7 +51,7 @@ func TestDNSAgentReload(t *testing.T) {
|
||||
shdWg := new(sync.WaitGroup)
|
||||
|
||||
server := cores.NewServer(nil)
|
||||
srvMngr := servmanager.NewServiceManager(shdWg, nil, cfg.GetReloadChan())
|
||||
srvMngr := servmanager.NewServiceManager(shdWg, nil, cfg)
|
||||
srvDep := map[string]*sync.WaitGroup{utils.DataDB: new(sync.WaitGroup)}
|
||||
db := NewDataDBService(cfg, nil, srvDep)
|
||||
anz := NewAnalyzerService(cfg, server, filterSChan, make(chan birpc.ClientConnector, 1), srvDep)
|
||||
|
||||
@@ -53,7 +53,7 @@ func TestEventExporterSReload(t *testing.T) {
|
||||
filterSChan <- nil
|
||||
shdWg := new(sync.WaitGroup)
|
||||
server := cores.NewServer(nil)
|
||||
srvMngr := servmanager.NewServiceManager(shdWg, nil, cfg.GetReloadChan())
|
||||
srvMngr := servmanager.NewServiceManager(shdWg, nil, cfg)
|
||||
srvDep := map[string]*sync.WaitGroup{utils.DataDB: new(sync.WaitGroup)}
|
||||
db := NewDataDBService(cfg, nil, srvDep)
|
||||
chS := engine.NewCacheS(cfg, nil, nil, nil)
|
||||
|
||||
@@ -60,7 +60,7 @@ func TestEventReaderSReload(t *testing.T) {
|
||||
}()
|
||||
shdWg := new(sync.WaitGroup)
|
||||
server := cores.NewServer(nil)
|
||||
srvMngr := servmanager.NewServiceManager(shdWg, nil, cfg.GetReloadChan())
|
||||
srvMngr := servmanager.NewServiceManager(shdWg, nil, cfg)
|
||||
srvDep := map[string]*sync.WaitGroup{utils.DataDB: new(sync.WaitGroup)}
|
||||
anz := NewAnalyzerService(cfg, server, filterSChan, make(chan birpc.ClientConnector, 1), srvDep)
|
||||
db := NewDataDBService(cfg, nil, srvDep)
|
||||
|
||||
@@ -51,7 +51,7 @@ func TestFreeSwitchAgentReload(t *testing.T) {
|
||||
shdWg := new(sync.WaitGroup)
|
||||
|
||||
server := cores.NewServer(nil)
|
||||
srvMngr := servmanager.NewServiceManager(shdWg, nil, cfg.GetReloadChan())
|
||||
srvMngr := servmanager.NewServiceManager(shdWg, nil, cfg)
|
||||
srvDep := map[string]*sync.WaitGroup{utils.DataDB: new(sync.WaitGroup)}
|
||||
db := NewDataDBService(cfg, nil, srvDep)
|
||||
anz := NewAnalyzerService(cfg, server, filterSChan, make(chan birpc.ClientConnector, 1), srvDep)
|
||||
|
||||
@@ -49,7 +49,7 @@ func TestHTTPAgentReload(t *testing.T) {
|
||||
}()
|
||||
shdWg := new(sync.WaitGroup)
|
||||
server := cores.NewServer(nil)
|
||||
srvMngr := servmanager.NewServiceManager(shdWg, nil, cfg.GetReloadChan())
|
||||
srvMngr := servmanager.NewServiceManager(shdWg, nil, cfg)
|
||||
srvDep := map[string]*sync.WaitGroup{utils.DataDB: new(sync.WaitGroup)}
|
||||
db := NewDataDBService(cfg, nil, srvDep)
|
||||
anz := NewAnalyzerService(cfg, server, filterSChan, make(chan birpc.ClientConnector, 1), srvDep)
|
||||
|
||||
@@ -48,7 +48,7 @@ func TestKamailioAgentReload(t *testing.T) {
|
||||
shdWg := new(sync.WaitGroup)
|
||||
|
||||
server := cores.NewServer(nil)
|
||||
srvMngr := servmanager.NewServiceManager(shdWg, nil, cfg.GetReloadChan())
|
||||
srvMngr := servmanager.NewServiceManager(shdWg, nil, cfg)
|
||||
srvDep := map[string]*sync.WaitGroup{utils.DataDB: new(sync.WaitGroup)}
|
||||
db := NewDataDBService(cfg, nil, srvDep)
|
||||
anz := NewAnalyzerService(cfg, server, filterSChan, make(chan birpc.ClientConnector, 1), srvDep)
|
||||
|
||||
@@ -31,6 +31,7 @@ import (
|
||||
|
||||
"github.com/cgrates/birpc"
|
||||
"github.com/cgrates/birpc/context"
|
||||
"github.com/cgrates/cgrates/apis"
|
||||
"github.com/cgrates/cgrates/config"
|
||||
"github.com/cgrates/cgrates/cores"
|
||||
"github.com/cgrates/cgrates/engine"
|
||||
@@ -195,11 +196,9 @@ func cgrInitGuardianSv1(iGuardianSCh chan birpc.ClientConnector, cfg *config.CGR
|
||||
func cgrInitServiceManagerV1(iServMngrCh chan birpc.ClientConnector,
|
||||
srvMngr *servmanager.ServiceManager, cfg *config.CGRConfig,
|
||||
server *cores.Server, anz *AnalyzerService) {
|
||||
srv, _ := engine.NewServiceWithName(srvMngr, utils.ServiceManager, true)
|
||||
srv, _ := birpc.NewService(apis.NewServiceManagerV1(srvMngr), utils.EmptyString, false)
|
||||
if !cfg.DispatcherSCfg().Enabled {
|
||||
for _, s := range srv {
|
||||
server.RpcRegister(s)
|
||||
}
|
||||
server.RpcRegister(srv)
|
||||
}
|
||||
iServMngrCh <- anz.GetInternalCodec(srv, utils.ServiceManager)
|
||||
}
|
||||
|
||||
@@ -67,7 +67,7 @@ func TestLoaderSReload(t *testing.T) {
|
||||
filterSChan := make(chan *engine.FilterS, 1)
|
||||
filterSChan <- nil
|
||||
server := cores.NewServer(nil)
|
||||
srvMngr := servmanager.NewServiceManager(shdWg, nil, cfg.GetReloadChan())
|
||||
srvMngr := servmanager.NewServiceManager(shdWg, nil, cfg)
|
||||
srvDep := map[string]*sync.WaitGroup{utils.DataDB: new(sync.WaitGroup)}
|
||||
db := NewDataDBService(cfg, nil, srvDep)
|
||||
anz := NewAnalyzerService(cfg, server, filterSChan, make(chan birpc.ClientConnector, 1), srvDep)
|
||||
|
||||
@@ -52,7 +52,7 @@ func TestRadiusAgentReload(t *testing.T) {
|
||||
shdWg := new(sync.WaitGroup)
|
||||
|
||||
server := cores.NewServer(nil)
|
||||
srvMngr := servmanager.NewServiceManager(shdWg, nil, cfg.GetReloadChan())
|
||||
srvMngr := servmanager.NewServiceManager(shdWg, nil, cfg)
|
||||
srvDep := map[string]*sync.WaitGroup{utils.DataDB: new(sync.WaitGroup)}
|
||||
db := NewDataDBService(cfg, nil, srvDep)
|
||||
anz := NewAnalyzerService(cfg, server, filterSChan, make(chan birpc.ClientConnector, 1), srvDep)
|
||||
@@ -112,7 +112,7 @@ func TestRadiusAgentReload2(t *testing.T) {
|
||||
shdWg := new(sync.WaitGroup)
|
||||
|
||||
server := cores.NewServer(nil)
|
||||
srvMngr := servmanager.NewServiceManager(shdWg, nil, cfg.GetReloadChan())
|
||||
srvMngr := servmanager.NewServiceManager(shdWg, nil, cfg)
|
||||
srvDep := map[string]*sync.WaitGroup{utils.DataDB: new(sync.WaitGroup)}
|
||||
db := NewDataDBService(cfg, nil, srvDep)
|
||||
anz := NewAnalyzerService(cfg, server, filterSChan, make(chan birpc.ClientConnector, 1), srvDep)
|
||||
|
||||
@@ -43,7 +43,7 @@ func TestRateSReload(t *testing.T) {
|
||||
filterSChan <- nil
|
||||
shdWg := new(sync.WaitGroup)
|
||||
server := cores.NewServer(nil)
|
||||
srvMngr := servmanager.NewServiceManager(shdWg, nil, cfg.GetReloadChan())
|
||||
srvMngr := servmanager.NewServiceManager(shdWg, nil, cfg)
|
||||
srvDep := map[string]*sync.WaitGroup{utils.DataDB: new(sync.WaitGroup)}
|
||||
db := NewDataDBService(cfg, nil, srvDep)
|
||||
chS := engine.NewCacheS(cfg, nil, nil, nil)
|
||||
|
||||
@@ -50,7 +50,7 @@ func TestDispatcherHReload(t *testing.T) {
|
||||
filterSChan := make(chan *engine.FilterS, 1)
|
||||
filterSChan <- nil
|
||||
server := cores.NewServer(nil)
|
||||
srvMngr := servmanager.NewServiceManager(shdWg, nil, cfg.GetReloadChan())
|
||||
srvMngr := servmanager.NewServiceManager(shdWg, nil, cfg)
|
||||
srvDep := map[string]*sync.WaitGroup{utils.DataDB: new(sync.WaitGroup)}
|
||||
db := NewDataDBService(cfg, nil, srvDep)
|
||||
anz := NewAnalyzerService(cfg, server, filterSChan, make(chan birpc.ClientConnector, 1), srvDep)
|
||||
|
||||
@@ -55,7 +55,7 @@ func TestResourceSReload(t *testing.T) {
|
||||
chSCh <- chS
|
||||
css := &CacheService{cacheCh: chSCh}
|
||||
server := cores.NewServer(nil)
|
||||
srvMngr := servmanager.NewServiceManager(shdWg, nil, cfg.GetReloadChan())
|
||||
srvMngr := servmanager.NewServiceManager(shdWg, nil, cfg)
|
||||
srvDep := map[string]*sync.WaitGroup{utils.DataDB: new(sync.WaitGroup)}
|
||||
anz := NewAnalyzerService(cfg, server, filterSChan, make(chan birpc.ClientConnector, 1), srvDep)
|
||||
db := NewDataDBService(cfg, nil, srvDep)
|
||||
|
||||
@@ -50,7 +50,7 @@ func TestRouteSReload(t *testing.T) {
|
||||
chSCh <- chS
|
||||
css := &CacheService{cacheCh: chSCh}
|
||||
server := cores.NewServer(nil)
|
||||
srvMngr := servmanager.NewServiceManager(shdWg, nil, cfg.GetReloadChan())
|
||||
srvMngr := servmanager.NewServiceManager(shdWg, nil, cfg)
|
||||
srvDep := map[string]*sync.WaitGroup{utils.DataDB: new(sync.WaitGroup)}
|
||||
anz := NewAnalyzerService(cfg, server, filterSChan, make(chan birpc.ClientConnector, 1), srvDep)
|
||||
db := NewDataDBService(cfg, nil, srvDep)
|
||||
|
||||
@@ -46,7 +46,7 @@ func TestSIPAgentReload(t *testing.T) {
|
||||
shdWg := new(sync.WaitGroup)
|
||||
|
||||
server := cores.NewServer(nil)
|
||||
srvMngr := servmanager.NewServiceManager(shdWg, nil, cfg.GetReloadChan())
|
||||
srvMngr := servmanager.NewServiceManager(shdWg, nil, cfg)
|
||||
srvDep := map[string]*sync.WaitGroup{utils.DataDB: new(sync.WaitGroup)}
|
||||
db := NewDataDBService(cfg, nil, srvDep)
|
||||
anz := NewAnalyzerService(cfg, server, filterSChan, make(chan birpc.ClientConnector, 1), srvDep)
|
||||
|
||||
@@ -55,7 +55,7 @@ func TestStatSReload(t *testing.T) {
|
||||
chSCh <- chS
|
||||
css := &CacheService{cacheCh: chSCh}
|
||||
server := cores.NewServer(nil)
|
||||
srvMngr := servmanager.NewServiceManager(shdWg, nil, cfg.GetReloadChan())
|
||||
srvMngr := servmanager.NewServiceManager(shdWg, nil, cfg)
|
||||
srvDep := map[string]*sync.WaitGroup{utils.DataDB: new(sync.WaitGroup)}
|
||||
anz := NewAnalyzerService(cfg, server, filterSChan, make(chan birpc.ClientConnector, 1), srvDep)
|
||||
db := NewDataDBService(cfg, nil, srvDep)
|
||||
|
||||
@@ -51,7 +51,7 @@ func TestThresholdSReload(t *testing.T) {
|
||||
chSCh <- chS
|
||||
css := &CacheService{cacheCh: chSCh}
|
||||
server := cores.NewServer(nil)
|
||||
srvMngr := servmanager.NewServiceManager(shdWg, nil, cfg.GetReloadChan())
|
||||
srvMngr := servmanager.NewServiceManager(shdWg, nil, cfg)
|
||||
srvDep := map[string]*sync.WaitGroup{utils.DataDB: new(sync.WaitGroup)}
|
||||
anz := NewAnalyzerService(cfg, server, filterSChan, make(chan birpc.ClientConnector, 1), srvDep)
|
||||
db := NewDataDBService(cfg, nil, srvDep)
|
||||
@@ -120,7 +120,7 @@ func TestThresholdSReload2(t *testing.T) {
|
||||
chSCh <- chS
|
||||
css := &CacheService{cacheCh: chSCh}
|
||||
server := cores.NewServer(nil)
|
||||
srvMngr := servmanager.NewServiceManager(shdWg, nil, cfg.GetReloadChan())
|
||||
srvMngr := servmanager.NewServiceManager(shdWg, nil, cfg)
|
||||
srvDep := map[string]*sync.WaitGroup{utils.DataDB: new(sync.WaitGroup)}
|
||||
anz := NewAnalyzerService(cfg, server, filterSChan, make(chan birpc.ClientConnector, 1), srvDep)
|
||||
db := NewDataDBService(cfg, nil, srvDep)
|
||||
|
||||
@@ -19,6 +19,7 @@ along with this program. If not, see <http://www.gnu.org/licenses/>
|
||||
package servmanager
|
||||
|
||||
import (
|
||||
"errors"
|
||||
"fmt"
|
||||
"sync"
|
||||
|
||||
@@ -29,18 +30,20 @@ import (
|
||||
)
|
||||
|
||||
// NewServiceManager returns a service manager
|
||||
func NewServiceManager(shdWg *sync.WaitGroup, connMgr *engine.ConnManager, rldChan <-chan string) *ServiceManager {
|
||||
func NewServiceManager(shdWg *sync.WaitGroup, connMgr *engine.ConnManager, cfg *config.CGRConfig) *ServiceManager {
|
||||
return &ServiceManager{
|
||||
cfg: cfg,
|
||||
subsystems: make(map[string]Service),
|
||||
shdWg: shdWg,
|
||||
connMgr: connMgr,
|
||||
rldChan: rldChan,
|
||||
rldChan: cfg.GetReloadChan(),
|
||||
}
|
||||
}
|
||||
|
||||
// ServiceManager handles service management ran by the engine
|
||||
type ServiceManager struct {
|
||||
sync.RWMutex // lock access to any shared data
|
||||
sync.RWMutex // lock access to any shared data
|
||||
cfg *config.CGRConfig
|
||||
subsystems map[string]Service // active subsystems managed by SM
|
||||
|
||||
shdWg *sync.WaitGroup // list of shutdown items
|
||||
@@ -161,3 +164,143 @@ type Service interface {
|
||||
// ServiceName returns the service name
|
||||
ServiceName() string
|
||||
}
|
||||
|
||||
// ArgsServiceID are passed to Start/Stop/Status RPC methods
|
||||
type ArgsServiceID struct {
|
||||
ServiceID string
|
||||
APIOpts map[string]interface{}
|
||||
}
|
||||
|
||||
// V1StartService starts a service with ID
|
||||
func (srvMngr *ServiceManager) V1StartService(ctx *context.Context, args *ArgsServiceID, reply *string) (err error) {
|
||||
err = toggleService(args.ServiceID, true, srvMngr)
|
||||
if err != nil {
|
||||
return
|
||||
}
|
||||
*reply = utils.OK
|
||||
return
|
||||
}
|
||||
|
||||
// V1StopService shuts-down a service with ID
|
||||
func (srvMngr *ServiceManager) V1StopService(ctx *context.Context, args *ArgsServiceID, reply *string) (err error) {
|
||||
err = toggleService(args.ServiceID, false, srvMngr)
|
||||
if err != nil {
|
||||
return
|
||||
}
|
||||
*reply = utils.OK
|
||||
return
|
||||
}
|
||||
|
||||
// V1ServiceStatus returns the service status
|
||||
func (srvMngr *ServiceManager) V1ServiceStatus(ctx *context.Context, args *ArgsServiceID, reply *string) error {
|
||||
srvMngr.RLock()
|
||||
defer srvMngr.RUnlock()
|
||||
|
||||
srv := srvMngr.GetService(args.ServiceID)
|
||||
if srv == nil {
|
||||
return utils.ErrUnsupportedServiceID
|
||||
}
|
||||
running := srv.IsRunning()
|
||||
|
||||
if running {
|
||||
*reply = utils.RunningCaps
|
||||
} else {
|
||||
*reply = utils.StoppedCaps
|
||||
}
|
||||
return nil
|
||||
}
|
||||
|
||||
// GetConfig returns the Configuration
|
||||
func (srvMngr *ServiceManager) GetConfig() *config.CGRConfig {
|
||||
srvMngr.RLock()
|
||||
defer srvMngr.RUnlock()
|
||||
return srvMngr.cfg
|
||||
}
|
||||
|
||||
func toggleService(serviceID string, status bool, srvMngr *ServiceManager) (err error) {
|
||||
srvMngr.Lock()
|
||||
defer srvMngr.Unlock()
|
||||
switch serviceID {
|
||||
case utils.AccountS:
|
||||
srvMngr.cfg.AccountSCfg().Enabled = status
|
||||
srvMngr.cfg.GetReloadChan() <- serviceID
|
||||
case utils.ActionS:
|
||||
srvMngr.cfg.ActionSCfg().Enabled = status
|
||||
srvMngr.cfg.GetReloadChan() <- serviceID
|
||||
case utils.AdminS:
|
||||
srvMngr.cfg.AdminSCfg().Enabled = status
|
||||
srvMngr.cfg.GetReloadChan() <- serviceID
|
||||
case utils.AnalyzerS:
|
||||
srvMngr.cfg.AnalyzerSCfg().Enabled = status
|
||||
srvMngr.cfg.GetReloadChan() <- serviceID
|
||||
case utils.AttributeS:
|
||||
srvMngr.cfg.AttributeSCfg().Enabled = status
|
||||
srvMngr.cfg.GetReloadChan() <- serviceID
|
||||
case utils.CDRServer:
|
||||
srvMngr.cfg.CdrsCfg().Enabled = status
|
||||
srvMngr.cfg.GetReloadChan() <- serviceID
|
||||
case utils.ChargerS:
|
||||
srvMngr.cfg.ChargerSCfg().Enabled = status
|
||||
srvMngr.cfg.GetReloadChan() <- serviceID
|
||||
case utils.DispatcherS:
|
||||
srvMngr.cfg.DispatcherSCfg().Enabled = status
|
||||
srvMngr.cfg.GetReloadChan() <- serviceID
|
||||
case utils.EEs:
|
||||
srvMngr.cfg.EEsCfg().Enabled = status
|
||||
srvMngr.cfg.GetReloadChan() <- serviceID
|
||||
case utils.EFs:
|
||||
srvMngr.cfg.EFsCfg().Enabled = status
|
||||
srvMngr.cfg.GetReloadChan() <- serviceID
|
||||
case utils.ERs:
|
||||
srvMngr.cfg.ERsCfg().Enabled = status
|
||||
srvMngr.cfg.GetReloadChan() <- serviceID
|
||||
// case utils.LoaderS:
|
||||
// srvMngr.cfg.LoaderCfg()[0].Enabled = status
|
||||
// srvMngr.cfg.GetReloadChan() <- serviceID
|
||||
case utils.RateS:
|
||||
srvMngr.cfg.RateSCfg().Enabled = status
|
||||
srvMngr.cfg.GetReloadChan() <- serviceID
|
||||
case utils.ResourceS:
|
||||
srvMngr.cfg.ResourceSCfg().Enabled = status
|
||||
srvMngr.cfg.GetReloadChan() <- serviceID
|
||||
case utils.RouteS:
|
||||
srvMngr.cfg.RouteSCfg().Enabled = status
|
||||
srvMngr.cfg.GetReloadChan() <- serviceID
|
||||
case utils.SessionS:
|
||||
srvMngr.cfg.SessionSCfg().Enabled = status
|
||||
srvMngr.cfg.GetReloadChan() <- serviceID
|
||||
case utils.StatS:
|
||||
srvMngr.cfg.StatSCfg().Enabled = status
|
||||
srvMngr.cfg.GetReloadChan() <- serviceID
|
||||
case utils.ThresholdS:
|
||||
srvMngr.cfg.ThresholdSCfg().Enabled = status
|
||||
srvMngr.cfg.GetReloadChan() <- serviceID
|
||||
case utils.TPeS:
|
||||
srvMngr.cfg.TpeSCfg().Enabled = status
|
||||
srvMngr.cfg.GetReloadChan() <- serviceID
|
||||
case utils.AsteriskAgent:
|
||||
srvMngr.cfg.AsteriskAgentCfg().Enabled = status
|
||||
srvMngr.cfg.GetReloadChan() <- serviceID
|
||||
case utils.DiameterAgent:
|
||||
srvMngr.cfg.DiameterAgentCfg().Enabled = status
|
||||
srvMngr.cfg.GetReloadChan() <- serviceID
|
||||
case utils.DNSAgent:
|
||||
srvMngr.cfg.DNSAgentCfg().Enabled = status
|
||||
srvMngr.cfg.GetReloadChan() <- serviceID
|
||||
case utils.FreeSWITCHAgent:
|
||||
srvMngr.cfg.FsAgentCfg().Enabled = status
|
||||
srvMngr.cfg.GetReloadChan() <- serviceID
|
||||
case utils.KamailioAgent:
|
||||
srvMngr.cfg.KamAgentCfg().Enabled = status
|
||||
srvMngr.cfg.GetReloadChan() <- serviceID
|
||||
case utils.RadiusAgent:
|
||||
srvMngr.cfg.RadiusAgentCfg().Enabled = status
|
||||
srvMngr.cfg.GetReloadChan() <- serviceID
|
||||
case utils.SIPAgent:
|
||||
srvMngr.cfg.SIPAgentCfg().Enabled = status
|
||||
srvMngr.cfg.GetReloadChan() <- serviceID
|
||||
default:
|
||||
err = errors.New(utils.UnsupportedServiceIDCaps)
|
||||
}
|
||||
return
|
||||
}
|
||||
|
||||
@@ -79,6 +79,7 @@ var (
|
||||
ErrNegative = errors.New("NEGATIVE")
|
||||
ErrUnsupportedTPExporterType = errors.New("UNSUPPORTED_TPEXPORTER_TYPE")
|
||||
ErrCastFailed = errors.New("CAST_FAILED")
|
||||
ErrUnsupportedServiceID = errors.New(UnsupportedServiceIDCaps)
|
||||
|
||||
ErrMap = map[string]error{
|
||||
ErrNoMoreData.Error(): ErrNoMoreData,
|
||||
|
||||
Reference in New Issue
Block a user