mirror of
https://github.com/cgrates/cgrates.git
synced 2026-02-11 18:16:24 +05:00
Refactoring and tests in services
This commit is contained in:
committed by
Dan Christian Bogos
parent
aeb0b30c2e
commit
2edfd207d2
@@ -345,12 +345,11 @@ type ResourceService struct {
|
||||
}
|
||||
|
||||
// Shutdown is called to shutdown the service
|
||||
func (rS *ResourceService) Shutdown() error {
|
||||
func (rS *ResourceService) Shutdown() {
|
||||
utils.Logger.Info("<ResourceS> service shutdown initialized")
|
||||
close(rS.stopBackup)
|
||||
rS.storeResources()
|
||||
utils.Logger.Info("<ResourceS> service shutdown complete")
|
||||
return nil
|
||||
}
|
||||
|
||||
// StoreResource stores the resource in DB and corrects dirty flag
|
||||
|
||||
@@ -136,10 +136,9 @@ type RouteService struct {
|
||||
}
|
||||
|
||||
// Shutdown is called to shutdown the service
|
||||
func (rpS *RouteService) Shutdown() error {
|
||||
func (rpS *RouteService) Shutdown() {
|
||||
utils.Logger.Info(fmt.Sprintf("<%s> service shutdown initialized", utils.RouteS))
|
||||
utils.Logger.Info(fmt.Sprintf("<%s> service shutdown complete", utils.RouteS))
|
||||
return nil
|
||||
}
|
||||
|
||||
// matchingRouteProfilesForEvent returns ordered list of matching resources which are active by the time of the call
|
||||
|
||||
@@ -58,9 +58,8 @@ func (rS *RateS) ListenAndServe(stopChan, cfgRld chan struct{}) {
|
||||
}
|
||||
|
||||
// Shutdown is called to shutdown the service
|
||||
func (rS *RateS) Shutdown() (err error) {
|
||||
func (rS *RateS) Shutdown() {
|
||||
utils.Logger.Info(fmt.Sprintf("<%s> shutdown <%s>", utils.CoreS, utils.RateS))
|
||||
return
|
||||
}
|
||||
|
||||
// Call implements rpcclient.ClientConnector interface for internal RPC
|
||||
|
||||
@@ -84,15 +84,18 @@ func (kam *KamailioAgent) Reload() (err error) {
|
||||
return
|
||||
}
|
||||
kam.kam.Reload()
|
||||
go func(k *agents.KamailioAgent) {
|
||||
if err = k.Connect(); err != nil {
|
||||
if strings.Contains(err.Error(), "use of closed network connection") { // if closed by us do not log
|
||||
return
|
||||
}
|
||||
utils.Logger.Err(fmt.Sprintf("<%s> error: %s", utils.KamailioAgent, err))
|
||||
kam.shdChan.CloseOnce()
|
||||
go kam.reload(kam.kam)
|
||||
return
|
||||
}
|
||||
|
||||
func (kam *KamailioAgent) reload(k *agents.KamailioAgent) (err error) {
|
||||
if err = k.Connect(); err != nil {
|
||||
if strings.Contains(err.Error(), "use of closed network connection") { // if closed by us do not log
|
||||
return
|
||||
}
|
||||
}(kam.kam)
|
||||
utils.Logger.Err(fmt.Sprintf("<%s> error: %s", utils.KamailioAgent, err))
|
||||
kam.shdChan.CloseOnce()
|
||||
}
|
||||
return
|
||||
}
|
||||
|
||||
@@ -100,9 +103,7 @@ func (kam *KamailioAgent) Reload() (err error) {
|
||||
func (kam *KamailioAgent) Shutdown() (err error) {
|
||||
kam.Lock()
|
||||
defer kam.Unlock()
|
||||
if err = kam.kam.Shutdown(); err != nil {
|
||||
return
|
||||
}
|
||||
err = kam.kam.Shutdown()
|
||||
kam.kam = nil
|
||||
return
|
||||
}
|
||||
|
||||
@@ -141,3 +141,54 @@ func testCleanupFiles(t *testing.T) {
|
||||
}
|
||||
}
|
||||
}
|
||||
|
||||
func TestLoaderSReload2(t *testing.T) {
|
||||
cfg := config.NewDefaultCGRConfig()
|
||||
for _, ld := range cfg.LoaderCfg() {
|
||||
ld.Enabled = false
|
||||
}
|
||||
shdChan := utils.NewSyncedChan()
|
||||
filterSChan := make(chan *engine.FilterS, 1)
|
||||
filterSChan <- nil
|
||||
server := cores.NewServer(nil)
|
||||
srvDep := map[string]*sync.WaitGroup{utils.DataDB: new(sync.WaitGroup)}
|
||||
db := NewDataDBService(cfg, nil, srvDep)
|
||||
db.dbchan <- new(engine.DataManager)
|
||||
anz := NewAnalyzerService(cfg, server, filterSChan, shdChan, make(chan rpcclient.ClientConnector, 1), srvDep)
|
||||
srv := NewLoaderService(cfg, db, filterSChan,
|
||||
server, make(chan rpcclient.ClientConnector, 1),
|
||||
nil, anz, srvDep)
|
||||
err := srv.Start()
|
||||
if err != nil {
|
||||
t.Fatal(err)
|
||||
}
|
||||
}
|
||||
|
||||
func TestLoaderSReload3(t *testing.T) {
|
||||
cfg := config.NewDefaultCGRConfig()
|
||||
for _, ld := range cfg.LoaderCfg() {
|
||||
ld.Enabled = false
|
||||
}
|
||||
cfg.LoaderCfg()[0].Enabled = true
|
||||
cfg.LoaderCfg()[0].TpInDir = "/tmp/TestLoaderSReload3"
|
||||
cfg.LoaderCfg()[0].RunDelay = -1
|
||||
shdChan := utils.NewSyncedChan()
|
||||
filterSChan := make(chan *engine.FilterS, 1)
|
||||
filterSChan <- nil
|
||||
server := cores.NewServer(nil)
|
||||
srvDep := map[string]*sync.WaitGroup{utils.DataDB: new(sync.WaitGroup)}
|
||||
db := NewDataDBService(cfg, nil, srvDep)
|
||||
db.dbchan <- new(engine.DataManager)
|
||||
anz := NewAnalyzerService(cfg, server, filterSChan, shdChan, make(chan rpcclient.ClientConnector, 1), srvDep)
|
||||
srv := NewLoaderService(cfg, db, filterSChan,
|
||||
server, make(chan rpcclient.ClientConnector, 1),
|
||||
nil, anz, srvDep)
|
||||
err := srv.Start()
|
||||
if err == nil || err.Error() != "no such file or directory" {
|
||||
t.Errorf("\nExpected <%+v>, \nReceived <%+v>", "no such file or directory", err)
|
||||
}
|
||||
err = srv.Reload()
|
||||
if err == nil || err.Error() != "no such file or directory" {
|
||||
t.Errorf("\nExpected <%+v>, \nReceived <%+v>", "no such file or directory", err)
|
||||
}
|
||||
}
|
||||
|
||||
@@ -82,13 +82,7 @@ func (rals *RalService) Start() (err error) {
|
||||
<-rals.cacheS.GetPrecacheChannel(utils.CacheActionTriggers)
|
||||
<-rals.cacheS.GetPrecacheChannel(utils.CacheSharedGroups)
|
||||
<-rals.cacheS.GetPrecacheChannel(utils.CacheTimings)
|
||||
|
||||
if err = rals.responder.Start(); err != nil {
|
||||
if err != utils.ErrServiceAlreadyRunning {
|
||||
return
|
||||
}
|
||||
err = nil
|
||||
}
|
||||
rals.responder.Start() //we don't verify the error because responder.Start() always returns service already running
|
||||
|
||||
rals.rals = v1.NewRALsV1()
|
||||
|
||||
@@ -103,9 +97,7 @@ func (rals *RalService) Start() (err error) {
|
||||
// Reload handles the change of config
|
||||
func (rals *RalService) Reload() (err error) {
|
||||
engine.SetRpSubjectPrefixMatching(rals.cfg.RalsCfg().RpSubjectPrefixMatching)
|
||||
if err = rals.responder.Reload(); err != nil {
|
||||
return
|
||||
}
|
||||
rals.responder.Reload() //we don't verify the error because responder.Reload never returns an error
|
||||
return
|
||||
}
|
||||
|
||||
@@ -113,9 +105,7 @@ func (rals *RalService) Reload() (err error) {
|
||||
func (rals *RalService) Shutdown() (err error) {
|
||||
rals.Lock()
|
||||
defer rals.Unlock()
|
||||
if err = rals.responder.Shutdown(); err != nil {
|
||||
return
|
||||
}
|
||||
err = rals.responder.Shutdown() //we don't verify the error because responder.Reload never returns an error
|
||||
rals.rals = nil
|
||||
<-rals.connChan
|
||||
return
|
||||
|
||||
@@ -130,3 +130,46 @@ func TestRalsReload(t *testing.T) {
|
||||
shdChan.CloseOnce()
|
||||
time.Sleep(10 * time.Millisecond)
|
||||
}
|
||||
func TestRalsReload2(t *testing.T) {
|
||||
cfg := config.NewDefaultCGRConfig()
|
||||
|
||||
utils.Logger, _ = utils.Newlogger(utils.MetaSysLog, cfg.GeneralCfg().NodeID)
|
||||
utils.Logger.SetLogLevel(7)
|
||||
filterSChan := make(chan *engine.FilterS, 1)
|
||||
filterSChan <- nil
|
||||
shdChan := utils.NewSyncedChan()
|
||||
chS := engine.NewCacheS(cfg, nil, nil)
|
||||
close(chS.GetPrecacheChannel(utils.CacheThresholdProfiles))
|
||||
close(chS.GetPrecacheChannel(utils.CacheThresholds))
|
||||
close(chS.GetPrecacheChannel(utils.CacheThresholdFilterIndexes))
|
||||
close(chS.GetPrecacheChannel(utils.CacheDestinations))
|
||||
close(chS.GetPrecacheChannel(utils.CacheReverseDestinations))
|
||||
close(chS.GetPrecacheChannel(utils.CacheRatingPlans))
|
||||
close(chS.GetPrecacheChannel(utils.CacheRatingProfiles))
|
||||
close(chS.GetPrecacheChannel(utils.CacheActions))
|
||||
close(chS.GetPrecacheChannel(utils.CacheActionPlans))
|
||||
close(chS.GetPrecacheChannel(utils.CacheAccountActionPlans))
|
||||
close(chS.GetPrecacheChannel(utils.CacheActionTriggers))
|
||||
close(chS.GetPrecacheChannel(utils.CacheSharedGroups))
|
||||
close(chS.GetPrecacheChannel(utils.CacheTimings))
|
||||
|
||||
cfg.ThresholdSCfg().Enabled = true
|
||||
server := cores.NewServer(nil)
|
||||
srvDep := map[string]*sync.WaitGroup{utils.DataDB: new(sync.WaitGroup)}
|
||||
cfg.StorDbCfg().Type = utils.INTERNAL
|
||||
anz := NewAnalyzerService(cfg, server, filterSChan, shdChan, make(chan rpcclient.ClientConnector, 1), srvDep)
|
||||
ralS := NewRalService(cfg, chS, server,
|
||||
make(chan rpcclient.ClientConnector, 1),
|
||||
make(chan rpcclient.ClientConnector, 1),
|
||||
shdChan, nil, anz, srvDep)
|
||||
ralS.responder.resp = &engine.Responder{
|
||||
ShdChan: shdChan,
|
||||
Timeout: 0,
|
||||
Timezone: "",
|
||||
MaxComputedUsage: nil,
|
||||
}
|
||||
err := ralS.Start()
|
||||
if err != nil {
|
||||
t.Fatalf("\nExpecting <%+v>,\n Received <%+v>", nil, err)
|
||||
}
|
||||
}
|
||||
|
||||
@@ -101,9 +101,7 @@ func (rs *RateService) Shutdown() (err error) {
|
||||
rs.Lock()
|
||||
defer rs.Unlock()
|
||||
close(rs.stopChan)
|
||||
if err = rs.rateS.Shutdown(); err != nil {
|
||||
return
|
||||
}
|
||||
rs.rateS.Shutdown() //we don't verify the error because shutdown never returns an err
|
||||
rs.rateS = nil
|
||||
<-rs.intConnChan
|
||||
return
|
||||
|
||||
@@ -109,9 +109,7 @@ func (reS *ResourceService) Shutdown() (err error) {
|
||||
defer reS.srvDep[utils.DataDB].Done()
|
||||
reS.Lock()
|
||||
defer reS.Unlock()
|
||||
if err = reS.reS.Shutdown(); err != nil {
|
||||
return
|
||||
}
|
||||
reS.reS.Shutdown() //we don't verify the error because shutdown never returns an error
|
||||
reS.reS = nil
|
||||
reS.rpc = nil
|
||||
<-reS.connChan
|
||||
|
||||
@@ -104,9 +104,7 @@ func (routeS *RouteService) Reload() (err error) {
|
||||
func (routeS *RouteService) Shutdown() (err error) {
|
||||
routeS.Lock()
|
||||
defer routeS.Unlock()
|
||||
if err = routeS.routeS.Shutdown(); err != nil {
|
||||
return
|
||||
}
|
||||
routeS.routeS.Shutdown() //we don't verify the error because shutdown never returns an error
|
||||
routeS.routeS = nil
|
||||
routeS.rpc = nil
|
||||
<-routeS.connChan
|
||||
|
||||
Reference in New Issue
Block a user