Implemented Shutdown API and renamed cmd/cgr- test files

This commit is contained in:
nickolasdaniel
2021-06-11 16:34:13 +03:00
committed by Dan Christian Bogos
parent ee556310c8
commit 8748ad29c8
11 changed files with 166 additions and 21 deletions

View File

@@ -43,3 +43,9 @@ func (cS *CoreSv1) Sleep(_ *context.Context, arg *utils.DurationArgs, reply *str
*reply = utils.OK
return nil
}
func (cS *CoreSv1) Shutdown(_ *context.Context, _ *utils.CGREvent, reply *string) error {
cS.cS.ShutdownEngine()
*reply = utils.OK
return nil
}

114
apis/cores_it_test.go Normal file
View File

@@ -0,0 +1,114 @@
// +build integration
/*
Real-time Online/Offline Charging System (OCS) for Telecom & ISP environments
Copyright (C) ITsysCOM GmbH
This program is free software: you can redistribute it and/or modify
it under the terms of the GNU General Public License as published by
the Free Software Foundation, either version 3 of the License, or
(at your option) any later version.
This program is distributed in the hope that it will be useful,
but WITHOUT ANY WARRANTY; without even the implied warranty of
MERCHANTABILITY or FITNESS FOR A PARTICULAR PURPOSE. See the
GNU General Public License for more details.
You should have received a copy of the GNU General Public License
along with this program. If not, see <http://www.gnu.org/licenses/>
*/
package apis
import (
"path"
"testing"
"github.com/cgrates/birpc"
"github.com/cgrates/birpc/context"
"github.com/cgrates/cgrates/config"
"github.com/cgrates/cgrates/engine"
"github.com/cgrates/cgrates/utils"
)
var (
coreItCfgPath string
coreItDirPath string
coreItCfg *config.CGRConfig
coreItBiRPC *birpc.Client
coreItTests = []func(t *testing.T){
testCoreItLoadConfig,
testCoreItInitDataDb,
testCoreItInitStorDb,
testCoreItStartEngine,
testCoreItRpcConn,
testCoreItStatus,
testCoreItKillEngine,
}
)
func TestCoreItTests(t *testing.T) {
switch *dbType {
case utils.MetaInternal:
coreItDirPath = "all2"
case utils.MetaMongo:
coreItDirPath = "all2_mongo"
case utils.MetaMySQL:
coreItDirPath = "all2_mysql"
default:
t.Fatalf("Unsupported database")
}
for _, test := range coreItTests {
t.Run("Running integration tests", test)
}
}
func testCoreItLoadConfig(t *testing.T) {
var err error
coreItCfgPath = path.Join(*dataDir, "conf", "samples", "dispatchers", coreItDirPath)
if coreItCfg, err = config.NewCGRConfigFromPath(coreItCfgPath); err != nil {
t.Fatal(err)
}
}
func testCoreItInitDataDb(t *testing.T) {
if err := engine.InitDataDB(coreItCfg); err != nil {
t.Fatal(err)
}
}
func testCoreItInitStorDb(t *testing.T) {
if err := engine.InitStorDB(coreItCfg); err != nil {
t.Fatal(err)
}
}
func testCoreItStartEngine(t *testing.T) {
if _, err := engine.StartEngine(coreItCfgPath, *waitRater); err != nil {
t.Fatal(err)
}
}
func testCoreItRpcConn(t *testing.T) {
var err error
if coreItBiRPC, err = newRPCClient(coreItCfg.ListenCfg()); err != nil {
t.Fatal(err)
}
}
func testCoreItStatus(t *testing.T) {
args := &utils.TenantIDWithAPIOpts{}
var reply map[string]interface{}
if err := coreItBiRPC.Call(context.Background(), utils.CoreSv1Status,
args, &reply); err != nil {
t.Fatal(err)
} else if reply[utils.NodeID] != "ALL2" {
t.Errorf("Expected ALL2 but received %v", reply[utils.NodeID])
}
}
func testCoreItKillEngine(t *testing.T) {
if err := engine.KillEngine(*waitRater); err != nil {
t.Error(err)
}
}

View File

@@ -29,7 +29,7 @@ import (
func TestCoreSStatus(t *testing.T) {
cfg := config.NewDefaultCGRConfig()
caps := engine.NewCaps(2, utils.MetaTopUp)
coreService := cores.NewCoreService(cfg, caps, make(chan struct{}))
coreService := cores.NewCoreService(cfg, caps, make(chan struct{}), nil)
cS := NewCoreSv1(coreService)
arg := &utils.TenantWithAPIOpts{
Tenant: "cgrates.org",
@@ -44,7 +44,7 @@ func TestCoreSStatus(t *testing.T) {
func TestCoreSSleep(t *testing.T) {
cfg := config.NewDefaultCGRConfig()
caps := engine.NewCaps(2, utils.MetaTopUp)
coreService := cores.NewCoreService(cfg, caps, make(chan struct{}))
coreService := cores.NewCoreService(cfg, caps, make(chan struct{}), nil)
cS := NewCoreSv1(coreService)
arg := &utils.DurationArgs{
Duration: 3 * time.Second,
@@ -55,5 +55,19 @@ func TestCoreSSleep(t *testing.T) {
} else if reply != "OK" {
t.Errorf("Expected OK, reveived %+v", reply)
}
}
func TestCoreSShutdown(t *testing.T) {
cfg := config.NewDefaultCGRConfig()
caps := engine.NewCaps(2, utils.MetaTopUp)
shdChan := utils.NewSyncedChan()
coreService := cores.NewCoreService(cfg, caps, make(chan struct{}), shdChan)
cS := NewCoreSv1(coreService)
arg := &utils.CGREvent{}
var reply string
if err := cS.Shutdown(context.Background(), arg, &reply); err != nil {
t.Error(err)
} else if reply != "OK" {
t.Errorf("Expected OK, received %+v", reply)
}
}

View File

@@ -616,7 +616,7 @@ func main() {
}
// init CoreSv1
coreS := services.NewCoreService(cfg, caps, server, internalCoreSv1Chan, anz, srvDep)
coreS := services.NewCoreService(cfg, caps, server, internalCoreSv1Chan, anz, srvDep, shdChan)
shdWg.Add(1)
if err := coreS.Start(); err != nil {
fmt.Println(err)

View File

@@ -27,7 +27,7 @@ import (
"github.com/cgrates/cgrates/utils"
)
func NewCoreService(cfg *config.CGRConfig, caps *engine.Caps, stopChan chan struct{}) *CoreService {
func NewCoreService(cfg *config.CGRConfig, caps *engine.Caps, stopChan chan struct{}, shdEngine *utils.SyncedChan) *CoreService {
var st *engine.CapsStats
if caps.IsLimited() && cfg.CoreSCfg().CapsStatsInterval != 0 {
st = engine.NewCapsStats(cfg.CoreSCfg().CapsStatsInterval, caps, stopChan)
@@ -35,12 +35,18 @@ func NewCoreService(cfg *config.CGRConfig, caps *engine.Caps, stopChan chan stru
return &CoreService{
cfg: cfg,
CapsStats: st,
shdEngine: shdEngine,
}
}
type CoreService struct {
cfg *config.CGRConfig
CapsStats *engine.CapsStats
shdEngine *utils.SyncedChan
}
func (cS *CoreService) ShutdownEngine() {
cS.shdEngine.CloseOnce()
}
// Shutdown is called to shutdown the service

View File

@@ -33,20 +33,23 @@ func TestNewCoreService(t *testing.T) {
cfgDflt := config.NewDefaultCGRConfig()
cfgDflt.CoreSCfg().CapsStatsInterval = time.Second
stopchan := make(chan struct{}, 1)
shdChan := utils.NewSyncedChan()
caps := engine.NewCaps(1, utils.MetaBusy)
sts := engine.NewCapsStats(cfgDflt.CoreSCfg().CapsStatsInterval, caps, stopchan)
expected := &CoreService{
cfg: cfgDflt,
CapsStats: sts,
shdEngine: shdChan,
}
rcv := NewCoreService(cfgDflt, caps, stopchan)
rcv := NewCoreService(cfgDflt, caps, stopchan, shdChan)
if !reflect.DeepEqual(expected, rcv) {
t.Errorf("Expected %+v, received %+v", utils.ToJSON(expected), utils.ToJSON(rcv))
}
close(stopchan)
//shut down the service
rcv.Shutdown()
rcv.ShutdownEngine()
}
func TestCoreServiceStatus(t *testing.T) {
@@ -55,7 +58,7 @@ func TestCoreServiceStatus(t *testing.T) {
caps := engine.NewCaps(1, utils.MetaBusy)
stopChan := make(chan struct{}, 1)
cores := NewCoreService(cfgDflt, caps, stopChan)
cores := NewCoreService(cfgDflt, caps, stopChan, nil)
args := &utils.TenantWithAPIOpts{
Tenant: "cgrates.org",
APIOpts: map[string]interface{}{},

View File

@@ -33,24 +33,26 @@ import (
// NewCoreService returns the Core Service
func NewCoreService(cfg *config.CGRConfig, caps *engine.Caps, server *cores.Server,
internalCoreSChan chan birpc.ClientConnector, anz *AnalyzerService,
srvDep map[string]*sync.WaitGroup) *CoreService {
srvDep map[string]*sync.WaitGroup, shdEngine *utils.SyncedChan) *CoreService {
return &CoreService{
connChan: internalCoreSChan,
cfg: cfg,
caps: caps,
server: server,
anz: anz,
srvDep: srvDep,
connChan: internalCoreSChan,
cfg: cfg,
caps: caps,
server: server,
anz: anz,
srvDep: srvDep,
shdEngine: shdEngine,
}
}
// CoreService implements Service interface
type CoreService struct {
sync.RWMutex
cfg *config.CGRConfig
server *cores.Server
caps *engine.Caps
stopChan chan struct{}
cfg *config.CGRConfig
server *cores.Server
caps *engine.Caps
stopChan chan struct{}
shdEngine *utils.SyncedChan
cS *cores.CoreService
rpc *apis.CoreSv1
@@ -69,7 +71,7 @@ func (cS *CoreService) Start() (err error) {
defer cS.Unlock()
utils.Logger.Info(fmt.Sprintf("<%s> starting <%s> subsystem", utils.CoreS, utils.CoreS))
cS.stopChan = make(chan struct{})
cS.cS = cores.NewCoreService(cS.cfg, cS.caps, cS.stopChan)
cS.cS = cores.NewCoreService(cS.cfg, cS.caps, cS.stopChan, cS.shdEngine)
cS.rpc = apis.NewCoreSv1(cS.cS)
srv, _ := birpc.NewService(cS.rpc, utils.EmptyString, false)
if !cS.cfg.DispatcherSCfg().Enabled {

View File

@@ -51,7 +51,7 @@ func TestCoreSReload(t *testing.T) {
coreRPC := make(chan birpc.ClientConnector, 1)
anz := NewAnalyzerService(cfg, server, filterSChan, shdChan, make(chan birpc.ClientConnector, 1), srvDep)
caps := engine.NewCaps(1, "test_caps")
coreS := NewCoreService(cfg, caps, server, coreRPC, anz, srvDep)
coreS := NewCoreService(cfg, caps, server, coreRPC, anz, srvDep, shdChan)
engine.NewConnManager(cfg, nil)
srvMngr.AddServices(coreS,
NewLoaderService(cfg, db, filterSChan, server, make(chan birpc.ClientConnector, 1), nil, anz, srvDep), db)

View File

@@ -41,7 +41,7 @@ func TestCoreSCoverage(t *testing.T) {
srvDep := map[string]*sync.WaitGroup{utils.DataDB: new(sync.WaitGroup)}
anz := NewAnalyzerService(cfg, server, filterSChan, shdChan, make(chan birpc.ClientConnector, 1), srvDep)
srv := NewCoreService(cfg, caps, server,
internalCoreSChan, anz, srvDep)
internalCoreSChan, anz, srvDep, shdChan)
if srv == nil {
t.Errorf("\nExpecting <nil>,\n Received <%+v>", utils.ToJSON(srv))
}