diff --git a/apis/cores.go b/apis/cores.go index bd092d14a..db40597ef 100644 --- a/apis/cores.go +++ b/apis/cores.go @@ -43,3 +43,9 @@ func (cS *CoreSv1) Sleep(_ *context.Context, arg *utils.DurationArgs, reply *str *reply = utils.OK return nil } + +func (cS *CoreSv1) Shutdown(_ *context.Context, _ *utils.CGREvent, reply *string) error { + cS.cS.ShutdownEngine() + *reply = utils.OK + return nil +} diff --git a/apis/cores_it_test.go b/apis/cores_it_test.go new file mode 100644 index 000000000..32bd9d2a2 --- /dev/null +++ b/apis/cores_it_test.go @@ -0,0 +1,114 @@ +// +build integration + +/* +Real-time Online/Offline Charging System (OCS) for Telecom & ISP environments +Copyright (C) ITsysCOM GmbH + +This program is free software: you can redistribute it and/or modify +it under the terms of the GNU General Public License as published by +the Free Software Foundation, either version 3 of the License, or +(at your option) any later version. + +This program is distributed in the hope that it will be useful, +but WITHOUT ANY WARRANTY; without even the implied warranty of +MERCHANTABILITY or FITNESS FOR A PARTICULAR PURPOSE. See the +GNU General Public License for more details. + +You should have received a copy of the GNU General Public License +along with this program. If not, see +*/ + +package apis + +import ( + "path" + "testing" + + "github.com/cgrates/birpc" + "github.com/cgrates/birpc/context" + "github.com/cgrates/cgrates/config" + "github.com/cgrates/cgrates/engine" + "github.com/cgrates/cgrates/utils" +) + +var ( + coreItCfgPath string + coreItDirPath string + coreItCfg *config.CGRConfig + coreItBiRPC *birpc.Client + coreItTests = []func(t *testing.T){ + testCoreItLoadConfig, + testCoreItInitDataDb, + testCoreItInitStorDb, + testCoreItStartEngine, + testCoreItRpcConn, + testCoreItStatus, + testCoreItKillEngine, + } +) + +func TestCoreItTests(t *testing.T) { + switch *dbType { + case utils.MetaInternal: + coreItDirPath = "all2" + case utils.MetaMongo: + coreItDirPath = "all2_mongo" + case utils.MetaMySQL: + coreItDirPath = "all2_mysql" + default: + t.Fatalf("Unsupported database") + } + for _, test := range coreItTests { + t.Run("Running integration tests", test) + } +} + +func testCoreItLoadConfig(t *testing.T) { + var err error + coreItCfgPath = path.Join(*dataDir, "conf", "samples", "dispatchers", coreItDirPath) + if coreItCfg, err = config.NewCGRConfigFromPath(coreItCfgPath); err != nil { + t.Fatal(err) + } +} + +func testCoreItInitDataDb(t *testing.T) { + if err := engine.InitDataDB(coreItCfg); err != nil { + t.Fatal(err) + } +} + +func testCoreItInitStorDb(t *testing.T) { + if err := engine.InitStorDB(coreItCfg); err != nil { + t.Fatal(err) + } +} + +func testCoreItStartEngine(t *testing.T) { + if _, err := engine.StartEngine(coreItCfgPath, *waitRater); err != nil { + t.Fatal(err) + } +} + +func testCoreItRpcConn(t *testing.T) { + var err error + if coreItBiRPC, err = newRPCClient(coreItCfg.ListenCfg()); err != nil { + t.Fatal(err) + } +} + +func testCoreItStatus(t *testing.T) { + args := &utils.TenantIDWithAPIOpts{} + var reply map[string]interface{} + if err := coreItBiRPC.Call(context.Background(), utils.CoreSv1Status, + args, &reply); err != nil { + t.Fatal(err) + } else if reply[utils.NodeID] != "ALL2" { + t.Errorf("Expected ALL2 but received %v", reply[utils.NodeID]) + } +} + +func testCoreItKillEngine(t *testing.T) { + if err := engine.KillEngine(*waitRater); err != nil { + t.Error(err) + } +} diff --git a/apis/cores_test.go b/apis/cores_test.go index b50832139..e792adcb1 100644 --- a/apis/cores_test.go +++ b/apis/cores_test.go @@ -29,7 +29,7 @@ import ( func TestCoreSStatus(t *testing.T) { cfg := config.NewDefaultCGRConfig() caps := engine.NewCaps(2, utils.MetaTopUp) - coreService := cores.NewCoreService(cfg, caps, make(chan struct{})) + coreService := cores.NewCoreService(cfg, caps, make(chan struct{}), nil) cS := NewCoreSv1(coreService) arg := &utils.TenantWithAPIOpts{ Tenant: "cgrates.org", @@ -44,7 +44,7 @@ func TestCoreSStatus(t *testing.T) { func TestCoreSSleep(t *testing.T) { cfg := config.NewDefaultCGRConfig() caps := engine.NewCaps(2, utils.MetaTopUp) - coreService := cores.NewCoreService(cfg, caps, make(chan struct{})) + coreService := cores.NewCoreService(cfg, caps, make(chan struct{}), nil) cS := NewCoreSv1(coreService) arg := &utils.DurationArgs{ Duration: 3 * time.Second, @@ -55,5 +55,19 @@ func TestCoreSSleep(t *testing.T) { } else if reply != "OK" { t.Errorf("Expected OK, reveived %+v", reply) } - +} + +func TestCoreSShutdown(t *testing.T) { + cfg := config.NewDefaultCGRConfig() + caps := engine.NewCaps(2, utils.MetaTopUp) + shdChan := utils.NewSyncedChan() + coreService := cores.NewCoreService(cfg, caps, make(chan struct{}), shdChan) + cS := NewCoreSv1(coreService) + arg := &utils.CGREvent{} + var reply string + if err := cS.Shutdown(context.Background(), arg, &reply); err != nil { + t.Error(err) + } else if reply != "OK" { + t.Errorf("Expected OK, received %+v", reply) + } } diff --git a/cmd/cgr-console/cgr-console_flags_test.go b/cmd/cgr-console/flags_test.go similarity index 100% rename from cmd/cgr-console/cgr-console_flags_test.go rename to cmd/cgr-console/flags_test.go diff --git a/cmd/cgr-engine/cgr-engine.go b/cmd/cgr-engine/cgr-engine.go index bc4fe42fd..67d09a8c6 100644 --- a/cmd/cgr-engine/cgr-engine.go +++ b/cmd/cgr-engine/cgr-engine.go @@ -616,7 +616,7 @@ func main() { } // init CoreSv1 - coreS := services.NewCoreService(cfg, caps, server, internalCoreSv1Chan, anz, srvDep) + coreS := services.NewCoreService(cfg, caps, server, internalCoreSv1Chan, anz, srvDep, shdChan) shdWg.Add(1) if err := coreS.Start(); err != nil { fmt.Println(err) diff --git a/cmd/cgr-engine/cgr-engine_flags_test.go b/cmd/cgr-engine/flags_test.go similarity index 100% rename from cmd/cgr-engine/cgr-engine_flags_test.go rename to cmd/cgr-engine/flags_test.go diff --git a/cores/core.go b/cores/core.go index 74b561760..a95013065 100644 --- a/cores/core.go +++ b/cores/core.go @@ -27,7 +27,7 @@ import ( "github.com/cgrates/cgrates/utils" ) -func NewCoreService(cfg *config.CGRConfig, caps *engine.Caps, stopChan chan struct{}) *CoreService { +func NewCoreService(cfg *config.CGRConfig, caps *engine.Caps, stopChan chan struct{}, shdEngine *utils.SyncedChan) *CoreService { var st *engine.CapsStats if caps.IsLimited() && cfg.CoreSCfg().CapsStatsInterval != 0 { st = engine.NewCapsStats(cfg.CoreSCfg().CapsStatsInterval, caps, stopChan) @@ -35,12 +35,18 @@ func NewCoreService(cfg *config.CGRConfig, caps *engine.Caps, stopChan chan stru return &CoreService{ cfg: cfg, CapsStats: st, + shdEngine: shdEngine, } } type CoreService struct { cfg *config.CGRConfig CapsStats *engine.CapsStats + shdEngine *utils.SyncedChan +} + +func (cS *CoreService) ShutdownEngine() { + cS.shdEngine.CloseOnce() } // Shutdown is called to shutdown the service diff --git a/cores/core_test.go b/cores/core_test.go index 0d278188d..682dd15ca 100644 --- a/cores/core_test.go +++ b/cores/core_test.go @@ -33,20 +33,23 @@ func TestNewCoreService(t *testing.T) { cfgDflt := config.NewDefaultCGRConfig() cfgDflt.CoreSCfg().CapsStatsInterval = time.Second stopchan := make(chan struct{}, 1) + shdChan := utils.NewSyncedChan() caps := engine.NewCaps(1, utils.MetaBusy) sts := engine.NewCapsStats(cfgDflt.CoreSCfg().CapsStatsInterval, caps, stopchan) expected := &CoreService{ cfg: cfgDflt, CapsStats: sts, + shdEngine: shdChan, } - rcv := NewCoreService(cfgDflt, caps, stopchan) + rcv := NewCoreService(cfgDflt, caps, stopchan, shdChan) if !reflect.DeepEqual(expected, rcv) { t.Errorf("Expected %+v, received %+v", utils.ToJSON(expected), utils.ToJSON(rcv)) } close(stopchan) //shut down the service rcv.Shutdown() + rcv.ShutdownEngine() } func TestCoreServiceStatus(t *testing.T) { @@ -55,7 +58,7 @@ func TestCoreServiceStatus(t *testing.T) { caps := engine.NewCaps(1, utils.MetaBusy) stopChan := make(chan struct{}, 1) - cores := NewCoreService(cfgDflt, caps, stopChan) + cores := NewCoreService(cfgDflt, caps, stopChan, nil) args := &utils.TenantWithAPIOpts{ Tenant: "cgrates.org", APIOpts: map[string]interface{}{}, diff --git a/services/cores.go b/services/cores.go index 4cd9c2171..ae9c2e0dd 100644 --- a/services/cores.go +++ b/services/cores.go @@ -33,24 +33,26 @@ import ( // NewCoreService returns the Core Service func NewCoreService(cfg *config.CGRConfig, caps *engine.Caps, server *cores.Server, internalCoreSChan chan birpc.ClientConnector, anz *AnalyzerService, - srvDep map[string]*sync.WaitGroup) *CoreService { + srvDep map[string]*sync.WaitGroup, shdEngine *utils.SyncedChan) *CoreService { return &CoreService{ - connChan: internalCoreSChan, - cfg: cfg, - caps: caps, - server: server, - anz: anz, - srvDep: srvDep, + connChan: internalCoreSChan, + cfg: cfg, + caps: caps, + server: server, + anz: anz, + srvDep: srvDep, + shdEngine: shdEngine, } } // CoreService implements Service interface type CoreService struct { sync.RWMutex - cfg *config.CGRConfig - server *cores.Server - caps *engine.Caps - stopChan chan struct{} + cfg *config.CGRConfig + server *cores.Server + caps *engine.Caps + stopChan chan struct{} + shdEngine *utils.SyncedChan cS *cores.CoreService rpc *apis.CoreSv1 @@ -69,7 +71,7 @@ func (cS *CoreService) Start() (err error) { defer cS.Unlock() utils.Logger.Info(fmt.Sprintf("<%s> starting <%s> subsystem", utils.CoreS, utils.CoreS)) cS.stopChan = make(chan struct{}) - cS.cS = cores.NewCoreService(cS.cfg, cS.caps, cS.stopChan) + cS.cS = cores.NewCoreService(cS.cfg, cS.caps, cS.stopChan, cS.shdEngine) cS.rpc = apis.NewCoreSv1(cS.cS) srv, _ := birpc.NewService(cS.rpc, utils.EmptyString, false) if !cS.cfg.DispatcherSCfg().Enabled { diff --git a/services/cores_it_test.go b/services/cores_it_test.go index 29344e0cd..a91eb50eb 100644 --- a/services/cores_it_test.go +++ b/services/cores_it_test.go @@ -51,7 +51,7 @@ func TestCoreSReload(t *testing.T) { coreRPC := make(chan birpc.ClientConnector, 1) anz := NewAnalyzerService(cfg, server, filterSChan, shdChan, make(chan birpc.ClientConnector, 1), srvDep) caps := engine.NewCaps(1, "test_caps") - coreS := NewCoreService(cfg, caps, server, coreRPC, anz, srvDep) + coreS := NewCoreService(cfg, caps, server, coreRPC, anz, srvDep, shdChan) engine.NewConnManager(cfg, nil) srvMngr.AddServices(coreS, NewLoaderService(cfg, db, filterSChan, server, make(chan birpc.ClientConnector, 1), nil, anz, srvDep), db) diff --git a/services/cores_test.go b/services/cores_test.go index 1ba42380a..f9a5d8506 100644 --- a/services/cores_test.go +++ b/services/cores_test.go @@ -41,7 +41,7 @@ func TestCoreSCoverage(t *testing.T) { srvDep := map[string]*sync.WaitGroup{utils.DataDB: new(sync.WaitGroup)} anz := NewAnalyzerService(cfg, server, filterSChan, shdChan, make(chan birpc.ClientConnector, 1), srvDep) srv := NewCoreService(cfg, caps, server, - internalCoreSChan, anz, srvDep) + internalCoreSChan, anz, srvDep, shdChan) if srv == nil { t.Errorf("\nExpecting ,\n Received <%+v>", utils.ToJSON(srv)) }