From 60a4da69e0ebf195cea1bed3a4e4ddfcc12256ce Mon Sep 17 00:00:00 2001 From: ionutboangiu Date: Wed, 19 Oct 2022 18:59:34 +0300 Subject: [PATCH] Implement start/stop/status methods for servmanager + tests --- apis/servmanager.go | 45 ++++ apis/servmanager_it_test.go | 212 ++++++++++++++++++ .../samples/apis_srvmng_internal/cgrates.json | 18 ++ .../samples/apis_srvmng_mongo/cgrates.json | 20 ++ .../samples/apis_srvmng_mysql/cgrates.json | 20 ++ services/accounts_it_test.go | 2 +- services/actions_it_test.go | 2 +- services/analyzers_it_test.go | 2 +- services/asteriskagent_it_test.go | 4 +- services/attributes_it_test.go | 2 +- services/cdrs_it_test.go | 2 +- services/cgr-engine.go | 2 +- services/chargers_it_test.go | 2 +- services/cores_it_test.go | 2 +- services/datadb_it_test.go | 2 +- services/diameteragent_it_test.go | 2 +- services/dispatchers_it_test.go | 2 +- services/dnsagent_it_test.go | 2 +- services/ees_it_test.go | 2 +- services/ers_it_test.go | 2 +- services/freeswitchagent_it_test.go | 2 +- services/httpagent_it_test.go | 2 +- services/kamailioagent_it_test.go | 2 +- services/libcgr-engine.go | 7 +- services/loaders_it_test.go | 2 +- services/radiusagent_it_test.go | 4 +- services/rates_it_test.go | 2 +- services/registrarc_it_test.go | 2 +- services/resources_it_test.go | 2 +- services/routes_it_test.go | 2 +- services/sipagent_it_test.go | 2 +- services/stats_it_test.go | 2 +- services/thresholds_it_test.go | 4 +- servmanager/servmanager.go | 149 +++++++++++- utils/errors.go | 1 + 35 files changed, 495 insertions(+), 37 deletions(-) create mode 100644 apis/servmanager.go create mode 100644 apis/servmanager_it_test.go create mode 100644 data/conf/samples/apis_srvmng_internal/cgrates.json create mode 100644 data/conf/samples/apis_srvmng_mongo/cgrates.json create mode 100644 data/conf/samples/apis_srvmng_mysql/cgrates.json diff --git a/apis/servmanager.go b/apis/servmanager.go new file mode 100644 index 000000000..85accfae1 --- /dev/null +++ b/apis/servmanager.go @@ -0,0 +1,45 @@ +/* +Real-time Online/Offline Charging System (OCS) for Telecom & ISP environments +Copyright (C) ITsysCOM GmbH + +This program is free software: you can redistribute it and/or modify +it under the terms of the GNU General Public License as published by +the Free Software Foundation, either version 3 of the License, or +(at your option) any later version. + +This program is distributed in the hope that it will be useful, +but WITHOUT ANY WARRANTY; without even the implied warranty of +MERCHANTABILITY or FITNESS FOR A PARTICULAR PURPOSE. See the +GNU General Public License for more details. + +You should have received a copy of the GNU General Public License +along with this program. If not, see +*/ + +package apis + +import ( + "github.com/cgrates/birpc/context" + "github.com/cgrates/cgrates/servmanager" +) + +func NewServiceManagerV1(sm *servmanager.ServiceManager) *ServiceManagerV1 { + return &ServiceManagerV1{sm: sm} +} + +type ServiceManagerV1 struct { + sm *servmanager.ServiceManager + ping +} + +func (servManager *ServiceManagerV1) StartService(ctx *context.Context, args *servmanager.ArgsServiceID, reply *string) (err error) { + return servManager.sm.V1StartService(ctx, args, reply) +} + +func (servManager *ServiceManagerV1) StopService(ctx *context.Context, args *servmanager.ArgsServiceID, reply *string) (err error) { + return servManager.sm.V1StopService(ctx, args, reply) +} + +func (servManager *ServiceManagerV1) ServiceStatus(ctx *context.Context, args *servmanager.ArgsServiceID, reply *string) (err error) { + return servManager.sm.V1ServiceStatus(ctx, args, reply) +} diff --git a/apis/servmanager_it_test.go b/apis/servmanager_it_test.go new file mode 100644 index 000000000..dd4079f7a --- /dev/null +++ b/apis/servmanager_it_test.go @@ -0,0 +1,212 @@ +//go:build integration +// +build integration + +/* +Real-time Online/Offline Charging System (OCS) for Telecom & ISP environments +Copyright (C) ITsysCOM GmbH + +This program is free software: you can redistribute it and/or modify +it under the terms of the GNU General Public License as published by +the Free Software Foundation, either version 3 of the License, or +(at your option) any later version. + +This program is distributed in the hope that it will be useful, +but WITHOUT ANY WARRANTY; without even the implied warranty of +MERCHANTABILITY or FITNESS FOR A PARTICULAR PURPOSE. See the +GNU General Public License for more details. + +You should have received a copy of the GNU General Public License +along with this program. If not, see +*/ + +package apis + +import ( + "fmt" + "path" + "testing" + + "github.com/cgrates/birpc" + "github.com/cgrates/birpc/context" + "github.com/cgrates/cgrates/config" + "github.com/cgrates/cgrates/engine" + "github.com/cgrates/cgrates/servmanager" + "github.com/cgrates/cgrates/utils" +) + +var ( + srvMngCfgPath string + srvMngCfg *config.CGRConfig + srvMngRPC *birpc.Client + srvMngConfigDIR string //run tests for specific configuration + + sTestsServManager = []func(t *testing.T){ + testSrvMngInitCfg, + testSrvMngInitDataDb, + testSrvMngStartEngine, + testSrvMngSRPCConn, + + testSrvMngPing, + + testSrvMngSKillEngine, + } +) + +func TestServManagerIT(t *testing.T) { + switch *dbType { + case utils.MetaInternal: + srvMngConfigDIR = "apis_srvmng_internal" + case utils.MetaMongo: + srvMngConfigDIR = "apis_srvmng_mongo" + case utils.MetaMySQL: + srvMngConfigDIR = "apis_srvmng_mysql" + case utils.MetaPostgres: + t.SkipNow() + default: + t.Fatal("Unknown Database type") + } + for _, stest := range sTestsServManager { + t.Run(srvMngConfigDIR, stest) + } +} + +func testSrvMngInitCfg(t *testing.T) { + var err error + srvMngCfgPath = path.Join(*dataDir, "conf", "samples", srvMngConfigDIR) + srvMngCfg, err = config.NewCGRConfigFromPath(context.Background(), srvMngCfgPath) + if err != nil { + t.Error(err) + } +} + +func testSrvMngInitDataDb(t *testing.T) { + if err := engine.InitDataDB(srvMngCfg); err != nil { + t.Fatal(err) + } +} + +// Start CGR Engine +func testSrvMngStartEngine(t *testing.T) { + if _, err := engine.StopStartEngine(srvMngCfgPath, *waitRater); err != nil { + t.Fatal(err) + } +} + +func testSrvMngSRPCConn(t *testing.T) { + var err error + srvMngRPC, err = newRPCClient(srvMngCfg.ListenCfg()) // We connect over JSON so we can also troubleshoot if needed + if err != nil { + t.Fatal(err) + } +} + +// Kill the engine when it is about to be finished +func testSrvMngSKillEngine(t *testing.T) { + if err := engine.KillEngine(100); err != nil { + t.Error(err) + } +} + +func testSrvMngPing(t *testing.T) { + var reply string + if err := srvMngRPC.Call(context.Background(), utils.ServiceManagerV1Ping, nil, + &reply); err != nil { + t.Error(err) + } else if reply != utils.Pong { + t.Errorf("Unexpected reply: %s", reply) + } + + serviceToMethod := map[string]string{ + utils.AdminS: utils.AdminSv1Ping, + utils.AccountS: utils.AccountSv1Ping, + utils.ActionS: utils.ActionSv1Ping, + utils.AnalyzerS: utils.AnalyzerSv1Ping, + utils.AttributeS: utils.AttributeSv1Ping, + utils.CDRServer: utils.CDRsV1Ping, + utils.ChargerS: utils.ChargerSv1Ping, + // utils.DispatcherS: utils.DispatcherSv1Ping, + utils.EEs: utils.EeSv1Ping, + utils.EFs: utils.EfSv1Ping, + utils.RateS: utils.RateSv1Ping, + utils.ResourceS: utils.ResourceSv1Ping, + utils.RouteS: utils.RouteSv1Ping, + utils.SessionS: utils.SessionSv1Ping, + utils.StatS: utils.StatSv1Ping, + utils.ThresholdS: utils.ThresholdSv1Ping, + utils.TPeS: utils.TPeSv1Ping, + } + + /* + Run the following tests for each service: + + - ping before enabling service (expect can't find service error) + - query for service status (expect "STOPPED" reply) + - start the service + - query for service status (expect "RUNNING" reply) + - ping (expect "Pong") + - stop service + - query for service status (expect "STOPPED" reply) + - ping after stopping service (expect "can't find service" error) + */ + + for serviceID, serviceMethod := range serviceToMethod { + t.Run(fmt.Sprintf("test for service %s", serviceID), func(t *testing.T) { + rpcErr := fmt.Sprintf("rpc: can't find service %s", serviceMethod) + if err := srvMngRPC.Call(context.Background(), serviceMethod, nil, + &reply); err == nil || err.Error() != rpcErr { + t.Errorf("expected: <%+v>,\nreceived: <%+v>", rpcErr, err) + } + + args := servmanager.ArgsServiceID{ + ServiceID: serviceID, + } + + if err := srvMngRPC.Call(context.Background(), utils.ServiceManagerV1ServiceStatus, args, + &reply); err != nil { + t.Error(err) + } else if reply != utils.StoppedCaps { + t.Errorf("Unexpected reply: %s", reply) + } + + if err := srvMngRPC.Call(context.Background(), utils.ServiceManagerV1StartService, args, + &reply); err != nil { + t.Error(err) + } else if reply != utils.OK { + t.Errorf("Unexpected reply: %s", reply) + } + + if err := srvMngRPC.Call(context.Background(), utils.ServiceManagerV1ServiceStatus, args, + &reply); err != nil { + t.Error(err) + } else if reply != utils.RunningCaps { + t.Errorf("Unexpected reply: %s", reply) + } + + if err := srvMngRPC.Call(context.Background(), serviceMethod, nil, + &reply); err != nil { + t.Error(err) + } else if reply != utils.Pong { + t.Errorf("Unexpected reply: %s", reply) + } + + if err := srvMngRPC.Call(context.Background(), utils.ServiceManagerV1StopService, args, + &reply); err != nil { + t.Error(err) + } else if reply != utils.OK { + t.Errorf("Unexpected reply: %s", reply) + } + + if err := srvMngRPC.Call(context.Background(), utils.ServiceManagerV1ServiceStatus, args, + &reply); err != nil { + t.Error(err) + } else if reply != utils.StoppedCaps { + t.Errorf("Unexpected reply: %s", reply) + } + + if err := srvMngRPC.Call(context.Background(), serviceMethod, nil, + &reply); err == nil || err.Error() != rpcErr { + t.Errorf("expected: <%+v>,\nreceived: <%+v>", rpcErr, err) + } + }) + } +} diff --git a/data/conf/samples/apis_srvmng_internal/cgrates.json b/data/conf/samples/apis_srvmng_internal/cgrates.json new file mode 100644 index 000000000..0bc0847c4 --- /dev/null +++ b/data/conf/samples/apis_srvmng_internal/cgrates.json @@ -0,0 +1,18 @@ +{ + +"general": { + "log_level": 7, + "reply_timeout": "50s" +}, + +"listen": { + "rpc_json": ":2012", + "rpc_gob": ":2013", + "http": ":2080" +}, + +"data_db": { + "db_type": "*internal" +} + +} \ No newline at end of file diff --git a/data/conf/samples/apis_srvmng_mongo/cgrates.json b/data/conf/samples/apis_srvmng_mongo/cgrates.json new file mode 100644 index 000000000..bc400080b --- /dev/null +++ b/data/conf/samples/apis_srvmng_mongo/cgrates.json @@ -0,0 +1,20 @@ +{ + +"general": { + "log_level": 7, + "reply_timeout": "50s" +}, + +"listen": { + "rpc_json": ":2012", + "rpc_gob": ":2013", + "http": ":2080" +}, + +"data_db": { + "db_type": "mongo", + "db_name": "10", + "db_port": 27017 +} + +} \ No newline at end of file diff --git a/data/conf/samples/apis_srvmng_mysql/cgrates.json b/data/conf/samples/apis_srvmng_mysql/cgrates.json new file mode 100644 index 000000000..2da9c5228 --- /dev/null +++ b/data/conf/samples/apis_srvmng_mysql/cgrates.json @@ -0,0 +1,20 @@ +{ + +"general": { + "log_level": 7, + "reply_timeout": "50s" +}, + +"listen": { + "rpc_json": ":2012", + "rpc_gob": ":2013", + "http": ":2080" +}, + +"data_db": { + "db_type": "redis", + "db_port": 6379, + "db_name": "10" +} + +} \ No newline at end of file diff --git a/services/accounts_it_test.go b/services/accounts_it_test.go index 922a2ce8e..109a0e257 100644 --- a/services/accounts_it_test.go +++ b/services/accounts_it_test.go @@ -48,7 +48,7 @@ func TestAccountSReload(t *testing.T) { chSCh <- chS css := &CacheService{cacheCh: chSCh} server := cores.NewServer(nil) - srvMngr := servmanager.NewServiceManager(shdWg, nil, cfg.GetReloadChan()) + srvMngr := servmanager.NewServiceManager(shdWg, nil, cfg) srvDep := map[string]*sync.WaitGroup{utils.DataDB: new(sync.WaitGroup)} db := NewDataDBService(cfg, nil, srvDep) acctRPC := make(chan birpc.ClientConnector, 1) diff --git a/services/actions_it_test.go b/services/actions_it_test.go index 9ba424906..c97fe7482 100644 --- a/services/actions_it_test.go +++ b/services/actions_it_test.go @@ -48,7 +48,7 @@ func TestActionSReload(t *testing.T) { chSCh <- chS css := &CacheService{cacheCh: chSCh} server := cores.NewServer(nil) - srvMngr := servmanager.NewServiceManager(shdWg, nil, cfg.GetReloadChan()) + srvMngr := servmanager.NewServiceManager(shdWg, nil, cfg) srvDep := map[string]*sync.WaitGroup{utils.DataDB: new(sync.WaitGroup)} db := NewDataDBService(cfg, nil, srvDep) actRPC := make(chan birpc.ClientConnector, 1) diff --git a/services/analyzers_it_test.go b/services/analyzers_it_test.go index 8767aa03a..d5756f8a6 100644 --- a/services/analyzers_it_test.go +++ b/services/analyzers_it_test.go @@ -47,7 +47,7 @@ func TestAnalyzerSReload(t *testing.T) { filterSChan := make(chan *engine.FilterS, 1) filterSChan <- nil server := cores.NewServer(nil) - srvMngr := servmanager.NewServiceManager(shdWg, nil, cfg.GetReloadChan()) + srvMngr := servmanager.NewServiceManager(shdWg, nil, cfg) srvDep := map[string]*sync.WaitGroup{utils.DataDB: new(sync.WaitGroup)} db := NewDataDBService(cfg, nil, srvDep) anzRPC := make(chan birpc.ClientConnector, 1) diff --git a/services/asteriskagent_it_test.go b/services/asteriskagent_it_test.go index 30ded3d0a..5ed0a4115 100644 --- a/services/asteriskagent_it_test.go +++ b/services/asteriskagent_it_test.go @@ -49,7 +49,7 @@ func TestAsteriskAgentReload(t *testing.T) { shdWg := new(sync.WaitGroup) server := cores.NewServer(nil) - srvMngr := servmanager.NewServiceManager(shdWg, nil, cfg.GetReloadChan()) + srvMngr := servmanager.NewServiceManager(shdWg, nil, cfg) srvDep := map[string]*sync.WaitGroup{utils.DataDB: new(sync.WaitGroup)} db := NewDataDBService(cfg, nil, srvDep) anz := NewAnalyzerService(cfg, server, filterSChan, make(chan birpc.ClientConnector, 1), srvDep) @@ -107,7 +107,7 @@ func TestAsteriskAgentReload2(t *testing.T) { shdWg := new(sync.WaitGroup) server := cores.NewServer(nil) - srvMngr := servmanager.NewServiceManager(shdWg, nil, cfg.GetReloadChan()) + srvMngr := servmanager.NewServiceManager(shdWg, nil, cfg) srvDep := map[string]*sync.WaitGroup{utils.DataDB: new(sync.WaitGroup)} db := NewDataDBService(cfg, nil, srvDep) anz := NewAnalyzerService(cfg, server, filterSChan, make(chan birpc.ClientConnector, 1), srvDep) diff --git a/services/attributes_it_test.go b/services/attributes_it_test.go index e353ab776..67881dd5d 100644 --- a/services/attributes_it_test.go +++ b/services/attributes_it_test.go @@ -48,7 +48,7 @@ func TestAttributeSReload(t *testing.T) { chSCh <- chS css := &CacheService{cacheCh: chSCh} server := cores.NewServer(nil) - srvMngr := servmanager.NewServiceManager(shdWg, nil, cfg.GetReloadChan()) + srvMngr := servmanager.NewServiceManager(shdWg, nil, cfg) srvDep := map[string]*sync.WaitGroup{utils.DataDB: new(sync.WaitGroup)} db := NewDataDBService(cfg, nil, srvDep) attrRPC := make(chan birpc.ClientConnector, 1) diff --git a/services/cdrs_it_test.go b/services/cdrs_it_test.go index a93ab4042..d38dcf6b2 100644 --- a/services/cdrs_it_test.go +++ b/services/cdrs_it_test.go @@ -51,7 +51,7 @@ func TestCdrsReload(t *testing.T) { cfg.ChargerSCfg().Enabled = true server := cores.NewServer(nil) - srvMngr := servmanager.NewServiceManager(shdWg, nil, cfg.GetReloadChan()) + srvMngr := servmanager.NewServiceManager(shdWg, nil, cfg) srvDep := map[string]*sync.WaitGroup{utils.DataDB: new(sync.WaitGroup)} db := NewDataDBService(cfg, nil, srvDep) anz := NewAnalyzerService(cfg, server, filterSChan, make(chan birpc.ClientConnector, 1), srvDep) diff --git a/services/cgr-engine.go b/services/cgr-engine.go index 29d430746..92b9e975a 100644 --- a/services/cgr-engine.go +++ b/services/cgr-engine.go @@ -47,7 +47,7 @@ func NewCGREngine(cfg *config.CGRConfig, cM *engine.ConnManager, shdWg *sync.Wai cM: cM, // connection manager caps: caps, // caps is used to limit RPC CPS shdWg: shdWg, // wait for shutdown - srvManager: servmanager.NewServiceManager(shdWg, cM, cfg.GetReloadChan()), + srvManager: servmanager.NewServiceManager(shdWg, cM, cfg), server: server, // Rpc/http server srvDep: map[string]*sync.WaitGroup{ utils.AnalyzerS: new(sync.WaitGroup), diff --git a/services/chargers_it_test.go b/services/chargers_it_test.go index 1f3d74705..c6740deec 100644 --- a/services/chargers_it_test.go +++ b/services/chargers_it_test.go @@ -53,7 +53,7 @@ func TestChargerSReload(t *testing.T) { filterSChan <- nil srvDep := map[string]*sync.WaitGroup{utils.DataDB: new(sync.WaitGroup)} server := cores.NewServer(nil) - srvMngr := servmanager.NewServiceManager(shdWg, nil, cfg.GetReloadChan()) + srvMngr := servmanager.NewServiceManager(shdWg, nil, cfg) db := NewDataDBService(cfg, nil, srvDep) anz := NewAnalyzerService(cfg, server, filterSChan, make(chan birpc.ClientConnector, 1), srvDep) attrS := NewAttributeService(cfg, db, css, filterSChan, server, make(chan birpc.ClientConnector, 1), anz, &DispatcherService{srvsReload: make(map[string]chan struct{})}, srvDep) diff --git a/services/cores_it_test.go b/services/cores_it_test.go index 5260fc615..20cd8c94a 100644 --- a/services/cores_it_test.go +++ b/services/cores_it_test.go @@ -42,7 +42,7 @@ func TestCoreSReload(t *testing.T) { filterSChan := make(chan *engine.FilterS, 1) filterSChan <- nil server := cores.NewServer(nil) - srvMngr := servmanager.NewServiceManager(shdWg, nil, cfg.GetReloadChan()) + srvMngr := servmanager.NewServiceManager(shdWg, nil, cfg) srvDep := map[string]*sync.WaitGroup{utils.DataDB: new(sync.WaitGroup)} db := NewDataDBService(cfg, nil, srvDep) coreRPC := make(chan birpc.ClientConnector, 1) diff --git a/services/datadb_it_test.go b/services/datadb_it_test.go index 9123d2560..337082630 100644 --- a/services/datadb_it_test.go +++ b/services/datadb_it_test.go @@ -50,7 +50,7 @@ func TestDataDBReload(t *testing.T) { css := &CacheService{cacheCh: chSCh} server := cores.NewServer(nil) srvDep := map[string]*sync.WaitGroup{utils.DataDB: new(sync.WaitGroup)} - srvMngr := servmanager.NewServiceManager(shdWg, nil, cfg.GetReloadChan()) + srvMngr := servmanager.NewServiceManager(shdWg, nil, cfg) cM := engine.NewConnManager(cfg) db := NewDataDBService(cfg, cM, srvDep) anz := NewAnalyzerService(cfg, server, filterSChan, make(chan birpc.ClientConnector, 1), srvDep) diff --git a/services/diameteragent_it_test.go b/services/diameteragent_it_test.go index c2c9c6a9b..6f648a76a 100644 --- a/services/diameteragent_it_test.go +++ b/services/diameteragent_it_test.go @@ -44,7 +44,7 @@ func TestDiameterAgentReload1(t *testing.T) { shdWg := new(sync.WaitGroup) server := cores.NewServer(nil) srvDep := map[string]*sync.WaitGroup{utils.DataDB: new(sync.WaitGroup)} - srvMngr := servmanager.NewServiceManager(shdWg, nil, cfg.GetReloadChan()) + srvMngr := servmanager.NewServiceManager(shdWg, nil, cfg) db := NewDataDBService(cfg, nil, srvDep) anz := NewAnalyzerService(cfg, server, filterSChan, make(chan birpc.ClientConnector, 1), srvDep) sS := NewSessionService(cfg, db, filterSChan, server, make(chan birpc.ClientConnector, 1), diff --git a/services/dispatchers_it_test.go b/services/dispatchers_it_test.go index a029482e5..981bd14ec 100644 --- a/services/dispatchers_it_test.go +++ b/services/dispatchers_it_test.go @@ -53,7 +53,7 @@ func TestDispatcherSReload(t *testing.T) { filterSChan := make(chan *engine.FilterS, 1) filterSChan <- nil server := cores.NewServer(nil) - srvMngr := servmanager.NewServiceManager(shdWg, nil, cfg.GetReloadChan()) + srvMngr := servmanager.NewServiceManager(shdWg, nil, cfg) srvDep := map[string]*sync.WaitGroup{utils.DataDB: new(sync.WaitGroup)} db := NewDataDBService(cfg, nil, srvDep) anz := NewAnalyzerService(cfg, server, filterSChan, make(chan birpc.ClientConnector, 1), srvDep) diff --git a/services/dnsagent_it_test.go b/services/dnsagent_it_test.go index e167e806b..06a7a4378 100644 --- a/services/dnsagent_it_test.go +++ b/services/dnsagent_it_test.go @@ -51,7 +51,7 @@ func TestDNSAgentReload(t *testing.T) { shdWg := new(sync.WaitGroup) server := cores.NewServer(nil) - srvMngr := servmanager.NewServiceManager(shdWg, nil, cfg.GetReloadChan()) + srvMngr := servmanager.NewServiceManager(shdWg, nil, cfg) srvDep := map[string]*sync.WaitGroup{utils.DataDB: new(sync.WaitGroup)} db := NewDataDBService(cfg, nil, srvDep) anz := NewAnalyzerService(cfg, server, filterSChan, make(chan birpc.ClientConnector, 1), srvDep) diff --git a/services/ees_it_test.go b/services/ees_it_test.go index aabb759f9..89c5c7a2a 100644 --- a/services/ees_it_test.go +++ b/services/ees_it_test.go @@ -53,7 +53,7 @@ func TestEventExporterSReload(t *testing.T) { filterSChan <- nil shdWg := new(sync.WaitGroup) server := cores.NewServer(nil) - srvMngr := servmanager.NewServiceManager(shdWg, nil, cfg.GetReloadChan()) + srvMngr := servmanager.NewServiceManager(shdWg, nil, cfg) srvDep := map[string]*sync.WaitGroup{utils.DataDB: new(sync.WaitGroup)} db := NewDataDBService(cfg, nil, srvDep) chS := engine.NewCacheS(cfg, nil, nil, nil) diff --git a/services/ers_it_test.go b/services/ers_it_test.go index a2b2e815e..233b2d7f6 100644 --- a/services/ers_it_test.go +++ b/services/ers_it_test.go @@ -60,7 +60,7 @@ func TestEventReaderSReload(t *testing.T) { }() shdWg := new(sync.WaitGroup) server := cores.NewServer(nil) - srvMngr := servmanager.NewServiceManager(shdWg, nil, cfg.GetReloadChan()) + srvMngr := servmanager.NewServiceManager(shdWg, nil, cfg) srvDep := map[string]*sync.WaitGroup{utils.DataDB: new(sync.WaitGroup)} anz := NewAnalyzerService(cfg, server, filterSChan, make(chan birpc.ClientConnector, 1), srvDep) db := NewDataDBService(cfg, nil, srvDep) diff --git a/services/freeswitchagent_it_test.go b/services/freeswitchagent_it_test.go index 3b0326fc6..ec6e9c036 100644 --- a/services/freeswitchagent_it_test.go +++ b/services/freeswitchagent_it_test.go @@ -51,7 +51,7 @@ func TestFreeSwitchAgentReload(t *testing.T) { shdWg := new(sync.WaitGroup) server := cores.NewServer(nil) - srvMngr := servmanager.NewServiceManager(shdWg, nil, cfg.GetReloadChan()) + srvMngr := servmanager.NewServiceManager(shdWg, nil, cfg) srvDep := map[string]*sync.WaitGroup{utils.DataDB: new(sync.WaitGroup)} db := NewDataDBService(cfg, nil, srvDep) anz := NewAnalyzerService(cfg, server, filterSChan, make(chan birpc.ClientConnector, 1), srvDep) diff --git a/services/httpagent_it_test.go b/services/httpagent_it_test.go index 108165bba..ccfeaa9ed 100644 --- a/services/httpagent_it_test.go +++ b/services/httpagent_it_test.go @@ -49,7 +49,7 @@ func TestHTTPAgentReload(t *testing.T) { }() shdWg := new(sync.WaitGroup) server := cores.NewServer(nil) - srvMngr := servmanager.NewServiceManager(shdWg, nil, cfg.GetReloadChan()) + srvMngr := servmanager.NewServiceManager(shdWg, nil, cfg) srvDep := map[string]*sync.WaitGroup{utils.DataDB: new(sync.WaitGroup)} db := NewDataDBService(cfg, nil, srvDep) anz := NewAnalyzerService(cfg, server, filterSChan, make(chan birpc.ClientConnector, 1), srvDep) diff --git a/services/kamailioagent_it_test.go b/services/kamailioagent_it_test.go index 8f4ea5273..b06889b43 100644 --- a/services/kamailioagent_it_test.go +++ b/services/kamailioagent_it_test.go @@ -48,7 +48,7 @@ func TestKamailioAgentReload(t *testing.T) { shdWg := new(sync.WaitGroup) server := cores.NewServer(nil) - srvMngr := servmanager.NewServiceManager(shdWg, nil, cfg.GetReloadChan()) + srvMngr := servmanager.NewServiceManager(shdWg, nil, cfg) srvDep := map[string]*sync.WaitGroup{utils.DataDB: new(sync.WaitGroup)} db := NewDataDBService(cfg, nil, srvDep) anz := NewAnalyzerService(cfg, server, filterSChan, make(chan birpc.ClientConnector, 1), srvDep) diff --git a/services/libcgr-engine.go b/services/libcgr-engine.go index 3a5af1e42..aa042c255 100644 --- a/services/libcgr-engine.go +++ b/services/libcgr-engine.go @@ -31,6 +31,7 @@ import ( "github.com/cgrates/birpc" "github.com/cgrates/birpc/context" + "github.com/cgrates/cgrates/apis" "github.com/cgrates/cgrates/config" "github.com/cgrates/cgrates/cores" "github.com/cgrates/cgrates/engine" @@ -195,11 +196,9 @@ func cgrInitGuardianSv1(iGuardianSCh chan birpc.ClientConnector, cfg *config.CGR func cgrInitServiceManagerV1(iServMngrCh chan birpc.ClientConnector, srvMngr *servmanager.ServiceManager, cfg *config.CGRConfig, server *cores.Server, anz *AnalyzerService) { - srv, _ := engine.NewServiceWithName(srvMngr, utils.ServiceManager, true) + srv, _ := birpc.NewService(apis.NewServiceManagerV1(srvMngr), utils.EmptyString, false) if !cfg.DispatcherSCfg().Enabled { - for _, s := range srv { - server.RpcRegister(s) - } + server.RpcRegister(srv) } iServMngrCh <- anz.GetInternalCodec(srv, utils.ServiceManager) } diff --git a/services/loaders_it_test.go b/services/loaders_it_test.go index 171533095..2d2d1602b 100644 --- a/services/loaders_it_test.go +++ b/services/loaders_it_test.go @@ -67,7 +67,7 @@ func TestLoaderSReload(t *testing.T) { filterSChan := make(chan *engine.FilterS, 1) filterSChan <- nil server := cores.NewServer(nil) - srvMngr := servmanager.NewServiceManager(shdWg, nil, cfg.GetReloadChan()) + srvMngr := servmanager.NewServiceManager(shdWg, nil, cfg) srvDep := map[string]*sync.WaitGroup{utils.DataDB: new(sync.WaitGroup)} db := NewDataDBService(cfg, nil, srvDep) anz := NewAnalyzerService(cfg, server, filterSChan, make(chan birpc.ClientConnector, 1), srvDep) diff --git a/services/radiusagent_it_test.go b/services/radiusagent_it_test.go index 33452c649..ecdcd0ee6 100644 --- a/services/radiusagent_it_test.go +++ b/services/radiusagent_it_test.go @@ -52,7 +52,7 @@ func TestRadiusAgentReload(t *testing.T) { shdWg := new(sync.WaitGroup) server := cores.NewServer(nil) - srvMngr := servmanager.NewServiceManager(shdWg, nil, cfg.GetReloadChan()) + srvMngr := servmanager.NewServiceManager(shdWg, nil, cfg) srvDep := map[string]*sync.WaitGroup{utils.DataDB: new(sync.WaitGroup)} db := NewDataDBService(cfg, nil, srvDep) anz := NewAnalyzerService(cfg, server, filterSChan, make(chan birpc.ClientConnector, 1), srvDep) @@ -112,7 +112,7 @@ func TestRadiusAgentReload2(t *testing.T) { shdWg := new(sync.WaitGroup) server := cores.NewServer(nil) - srvMngr := servmanager.NewServiceManager(shdWg, nil, cfg.GetReloadChan()) + srvMngr := servmanager.NewServiceManager(shdWg, nil, cfg) srvDep := map[string]*sync.WaitGroup{utils.DataDB: new(sync.WaitGroup)} db := NewDataDBService(cfg, nil, srvDep) anz := NewAnalyzerService(cfg, server, filterSChan, make(chan birpc.ClientConnector, 1), srvDep) diff --git a/services/rates_it_test.go b/services/rates_it_test.go index 68e77903c..5f1810e53 100644 --- a/services/rates_it_test.go +++ b/services/rates_it_test.go @@ -43,7 +43,7 @@ func TestRateSReload(t *testing.T) { filterSChan <- nil shdWg := new(sync.WaitGroup) server := cores.NewServer(nil) - srvMngr := servmanager.NewServiceManager(shdWg, nil, cfg.GetReloadChan()) + srvMngr := servmanager.NewServiceManager(shdWg, nil, cfg) srvDep := map[string]*sync.WaitGroup{utils.DataDB: new(sync.WaitGroup)} db := NewDataDBService(cfg, nil, srvDep) chS := engine.NewCacheS(cfg, nil, nil, nil) diff --git a/services/registrarc_it_test.go b/services/registrarc_it_test.go index 5f86b14f5..a081ffb52 100644 --- a/services/registrarc_it_test.go +++ b/services/registrarc_it_test.go @@ -50,7 +50,7 @@ func TestDispatcherHReload(t *testing.T) { filterSChan := make(chan *engine.FilterS, 1) filterSChan <- nil server := cores.NewServer(nil) - srvMngr := servmanager.NewServiceManager(shdWg, nil, cfg.GetReloadChan()) + srvMngr := servmanager.NewServiceManager(shdWg, nil, cfg) srvDep := map[string]*sync.WaitGroup{utils.DataDB: new(sync.WaitGroup)} db := NewDataDBService(cfg, nil, srvDep) anz := NewAnalyzerService(cfg, server, filterSChan, make(chan birpc.ClientConnector, 1), srvDep) diff --git a/services/resources_it_test.go b/services/resources_it_test.go index 74aa878a3..0618607eb 100644 --- a/services/resources_it_test.go +++ b/services/resources_it_test.go @@ -55,7 +55,7 @@ func TestResourceSReload(t *testing.T) { chSCh <- chS css := &CacheService{cacheCh: chSCh} server := cores.NewServer(nil) - srvMngr := servmanager.NewServiceManager(shdWg, nil, cfg.GetReloadChan()) + srvMngr := servmanager.NewServiceManager(shdWg, nil, cfg) srvDep := map[string]*sync.WaitGroup{utils.DataDB: new(sync.WaitGroup)} anz := NewAnalyzerService(cfg, server, filterSChan, make(chan birpc.ClientConnector, 1), srvDep) db := NewDataDBService(cfg, nil, srvDep) diff --git a/services/routes_it_test.go b/services/routes_it_test.go index 37885932e..9a9cfc907 100644 --- a/services/routes_it_test.go +++ b/services/routes_it_test.go @@ -50,7 +50,7 @@ func TestRouteSReload(t *testing.T) { chSCh <- chS css := &CacheService{cacheCh: chSCh} server := cores.NewServer(nil) - srvMngr := servmanager.NewServiceManager(shdWg, nil, cfg.GetReloadChan()) + srvMngr := servmanager.NewServiceManager(shdWg, nil, cfg) srvDep := map[string]*sync.WaitGroup{utils.DataDB: new(sync.WaitGroup)} anz := NewAnalyzerService(cfg, server, filterSChan, make(chan birpc.ClientConnector, 1), srvDep) db := NewDataDBService(cfg, nil, srvDep) diff --git a/services/sipagent_it_test.go b/services/sipagent_it_test.go index 86b88c957..8be0db503 100644 --- a/services/sipagent_it_test.go +++ b/services/sipagent_it_test.go @@ -46,7 +46,7 @@ func TestSIPAgentReload(t *testing.T) { shdWg := new(sync.WaitGroup) server := cores.NewServer(nil) - srvMngr := servmanager.NewServiceManager(shdWg, nil, cfg.GetReloadChan()) + srvMngr := servmanager.NewServiceManager(shdWg, nil, cfg) srvDep := map[string]*sync.WaitGroup{utils.DataDB: new(sync.WaitGroup)} db := NewDataDBService(cfg, nil, srvDep) anz := NewAnalyzerService(cfg, server, filterSChan, make(chan birpc.ClientConnector, 1), srvDep) diff --git a/services/stats_it_test.go b/services/stats_it_test.go index 6deecccaa..fd79dc2d7 100644 --- a/services/stats_it_test.go +++ b/services/stats_it_test.go @@ -55,7 +55,7 @@ func TestStatSReload(t *testing.T) { chSCh <- chS css := &CacheService{cacheCh: chSCh} server := cores.NewServer(nil) - srvMngr := servmanager.NewServiceManager(shdWg, nil, cfg.GetReloadChan()) + srvMngr := servmanager.NewServiceManager(shdWg, nil, cfg) srvDep := map[string]*sync.WaitGroup{utils.DataDB: new(sync.WaitGroup)} anz := NewAnalyzerService(cfg, server, filterSChan, make(chan birpc.ClientConnector, 1), srvDep) db := NewDataDBService(cfg, nil, srvDep) diff --git a/services/thresholds_it_test.go b/services/thresholds_it_test.go index 531b48104..ec64e4e72 100644 --- a/services/thresholds_it_test.go +++ b/services/thresholds_it_test.go @@ -51,7 +51,7 @@ func TestThresholdSReload(t *testing.T) { chSCh <- chS css := &CacheService{cacheCh: chSCh} server := cores.NewServer(nil) - srvMngr := servmanager.NewServiceManager(shdWg, nil, cfg.GetReloadChan()) + srvMngr := servmanager.NewServiceManager(shdWg, nil, cfg) srvDep := map[string]*sync.WaitGroup{utils.DataDB: new(sync.WaitGroup)} anz := NewAnalyzerService(cfg, server, filterSChan, make(chan birpc.ClientConnector, 1), srvDep) db := NewDataDBService(cfg, nil, srvDep) @@ -120,7 +120,7 @@ func TestThresholdSReload2(t *testing.T) { chSCh <- chS css := &CacheService{cacheCh: chSCh} server := cores.NewServer(nil) - srvMngr := servmanager.NewServiceManager(shdWg, nil, cfg.GetReloadChan()) + srvMngr := servmanager.NewServiceManager(shdWg, nil, cfg) srvDep := map[string]*sync.WaitGroup{utils.DataDB: new(sync.WaitGroup)} anz := NewAnalyzerService(cfg, server, filterSChan, make(chan birpc.ClientConnector, 1), srvDep) db := NewDataDBService(cfg, nil, srvDep) diff --git a/servmanager/servmanager.go b/servmanager/servmanager.go index 7614e4642..8e4f461db 100644 --- a/servmanager/servmanager.go +++ b/servmanager/servmanager.go @@ -19,6 +19,7 @@ along with this program. If not, see package servmanager import ( + "errors" "fmt" "sync" @@ -29,18 +30,20 @@ import ( ) // NewServiceManager returns a service manager -func NewServiceManager(shdWg *sync.WaitGroup, connMgr *engine.ConnManager, rldChan <-chan string) *ServiceManager { +func NewServiceManager(shdWg *sync.WaitGroup, connMgr *engine.ConnManager, cfg *config.CGRConfig) *ServiceManager { return &ServiceManager{ + cfg: cfg, subsystems: make(map[string]Service), shdWg: shdWg, connMgr: connMgr, - rldChan: rldChan, + rldChan: cfg.GetReloadChan(), } } // ServiceManager handles service management ran by the engine type ServiceManager struct { - sync.RWMutex // lock access to any shared data + sync.RWMutex // lock access to any shared data + cfg *config.CGRConfig subsystems map[string]Service // active subsystems managed by SM shdWg *sync.WaitGroup // list of shutdown items @@ -161,3 +164,143 @@ type Service interface { // ServiceName returns the service name ServiceName() string } + +// ArgsServiceID are passed to Start/Stop/Status RPC methods +type ArgsServiceID struct { + ServiceID string + APIOpts map[string]interface{} +} + +// V1StartService starts a service with ID +func (srvMngr *ServiceManager) V1StartService(ctx *context.Context, args *ArgsServiceID, reply *string) (err error) { + err = toggleService(args.ServiceID, true, srvMngr) + if err != nil { + return + } + *reply = utils.OK + return +} + +// V1StopService shuts-down a service with ID +func (srvMngr *ServiceManager) V1StopService(ctx *context.Context, args *ArgsServiceID, reply *string) (err error) { + err = toggleService(args.ServiceID, false, srvMngr) + if err != nil { + return + } + *reply = utils.OK + return +} + +// V1ServiceStatus returns the service status +func (srvMngr *ServiceManager) V1ServiceStatus(ctx *context.Context, args *ArgsServiceID, reply *string) error { + srvMngr.RLock() + defer srvMngr.RUnlock() + + srv := srvMngr.GetService(args.ServiceID) + if srv == nil { + return utils.ErrUnsupportedServiceID + } + running := srv.IsRunning() + + if running { + *reply = utils.RunningCaps + } else { + *reply = utils.StoppedCaps + } + return nil +} + +// GetConfig returns the Configuration +func (srvMngr *ServiceManager) GetConfig() *config.CGRConfig { + srvMngr.RLock() + defer srvMngr.RUnlock() + return srvMngr.cfg +} + +func toggleService(serviceID string, status bool, srvMngr *ServiceManager) (err error) { + srvMngr.Lock() + defer srvMngr.Unlock() + switch serviceID { + case utils.AccountS: + srvMngr.cfg.AccountSCfg().Enabled = status + srvMngr.cfg.GetReloadChan() <- serviceID + case utils.ActionS: + srvMngr.cfg.ActionSCfg().Enabled = status + srvMngr.cfg.GetReloadChan() <- serviceID + case utils.AdminS: + srvMngr.cfg.AdminSCfg().Enabled = status + srvMngr.cfg.GetReloadChan() <- serviceID + case utils.AnalyzerS: + srvMngr.cfg.AnalyzerSCfg().Enabled = status + srvMngr.cfg.GetReloadChan() <- serviceID + case utils.AttributeS: + srvMngr.cfg.AttributeSCfg().Enabled = status + srvMngr.cfg.GetReloadChan() <- serviceID + case utils.CDRServer: + srvMngr.cfg.CdrsCfg().Enabled = status + srvMngr.cfg.GetReloadChan() <- serviceID + case utils.ChargerS: + srvMngr.cfg.ChargerSCfg().Enabled = status + srvMngr.cfg.GetReloadChan() <- serviceID + case utils.DispatcherS: + srvMngr.cfg.DispatcherSCfg().Enabled = status + srvMngr.cfg.GetReloadChan() <- serviceID + case utils.EEs: + srvMngr.cfg.EEsCfg().Enabled = status + srvMngr.cfg.GetReloadChan() <- serviceID + case utils.EFs: + srvMngr.cfg.EFsCfg().Enabled = status + srvMngr.cfg.GetReloadChan() <- serviceID + case utils.ERs: + srvMngr.cfg.ERsCfg().Enabled = status + srvMngr.cfg.GetReloadChan() <- serviceID + // case utils.LoaderS: + // srvMngr.cfg.LoaderCfg()[0].Enabled = status + // srvMngr.cfg.GetReloadChan() <- serviceID + case utils.RateS: + srvMngr.cfg.RateSCfg().Enabled = status + srvMngr.cfg.GetReloadChan() <- serviceID + case utils.ResourceS: + srvMngr.cfg.ResourceSCfg().Enabled = status + srvMngr.cfg.GetReloadChan() <- serviceID + case utils.RouteS: + srvMngr.cfg.RouteSCfg().Enabled = status + srvMngr.cfg.GetReloadChan() <- serviceID + case utils.SessionS: + srvMngr.cfg.SessionSCfg().Enabled = status + srvMngr.cfg.GetReloadChan() <- serviceID + case utils.StatS: + srvMngr.cfg.StatSCfg().Enabled = status + srvMngr.cfg.GetReloadChan() <- serviceID + case utils.ThresholdS: + srvMngr.cfg.ThresholdSCfg().Enabled = status + srvMngr.cfg.GetReloadChan() <- serviceID + case utils.TPeS: + srvMngr.cfg.TpeSCfg().Enabled = status + srvMngr.cfg.GetReloadChan() <- serviceID + case utils.AsteriskAgent: + srvMngr.cfg.AsteriskAgentCfg().Enabled = status + srvMngr.cfg.GetReloadChan() <- serviceID + case utils.DiameterAgent: + srvMngr.cfg.DiameterAgentCfg().Enabled = status + srvMngr.cfg.GetReloadChan() <- serviceID + case utils.DNSAgent: + srvMngr.cfg.DNSAgentCfg().Enabled = status + srvMngr.cfg.GetReloadChan() <- serviceID + case utils.FreeSWITCHAgent: + srvMngr.cfg.FsAgentCfg().Enabled = status + srvMngr.cfg.GetReloadChan() <- serviceID + case utils.KamailioAgent: + srvMngr.cfg.KamAgentCfg().Enabled = status + srvMngr.cfg.GetReloadChan() <- serviceID + case utils.RadiusAgent: + srvMngr.cfg.RadiusAgentCfg().Enabled = status + srvMngr.cfg.GetReloadChan() <- serviceID + case utils.SIPAgent: + srvMngr.cfg.SIPAgentCfg().Enabled = status + srvMngr.cfg.GetReloadChan() <- serviceID + default: + err = errors.New(utils.UnsupportedServiceIDCaps) + } + return +} diff --git a/utils/errors.go b/utils/errors.go index f605817b8..1e01d2152 100644 --- a/utils/errors.go +++ b/utils/errors.go @@ -79,6 +79,7 @@ var ( ErrNegative = errors.New("NEGATIVE") ErrUnsupportedTPExporterType = errors.New("UNSUPPORTED_TPEXPORTER_TYPE") ErrCastFailed = errors.New("CAST_FAILED") + ErrUnsupportedServiceID = errors.New(UnsupportedServiceIDCaps) ErrMap = map[string]error{ ErrNoMoreData.Error(): ErrNoMoreData,