mirror of
https://github.com/cgrates/cgrates.git
synced 2026-02-20 06:38:45 +05:00
fix birpc nil panic on service restart
StopBiRPC made birpcSrv nil, so next Start panicked on register. Unregister the name instead so clients stay connected.
This commit is contained in:
committed by
Dan Christian Bogos
parent
7babf7b725
commit
7983a319db
@@ -59,10 +59,9 @@ type SessionService struct {
|
||||
sm *sessions.SessionS
|
||||
connChan chan birpc.ClientConnector
|
||||
|
||||
birpcEnabled bool
|
||||
connMgr *engine.ConnManager
|
||||
anz *AnalyzerService
|
||||
srvDep map[string]*sync.WaitGroup
|
||||
connMgr *engine.ConnManager
|
||||
anz *AnalyzerService
|
||||
srvDep map[string]*sync.WaitGroup
|
||||
}
|
||||
|
||||
// Start should handle the sercive start
|
||||
@@ -110,8 +109,7 @@ func (smg *SessionService) Start() error {
|
||||
smg.server.RpcRegister(legacySrv)
|
||||
}
|
||||
// Register BiRpc handlers
|
||||
if smg.cfg.ListenCfg().BiJSONListen != utils.EmptyString {
|
||||
smg.birpcEnabled = true
|
||||
if smg.cfg.ListenCfg().BiJSONListen != "" {
|
||||
smg.server.BiRPCRegisterName(utils.SessionSv1, srv)
|
||||
}
|
||||
return nil
|
||||
@@ -129,15 +127,14 @@ func (smg *SessionService) Shutdown() (err error) {
|
||||
defer smg.Unlock()
|
||||
close(smg.stopChan)
|
||||
if err = smg.sm.Shutdown(); err != nil {
|
||||
return
|
||||
return err
|
||||
}
|
||||
if smg.birpcEnabled {
|
||||
smg.server.StopBiRPC()
|
||||
smg.birpcEnabled = false
|
||||
if smg.cfg.ListenCfg().BiJSONListen != "" {
|
||||
smg.server.BiRPCUnregisterName(utils.SessionSv1)
|
||||
}
|
||||
smg.sm = nil
|
||||
<-smg.connChan
|
||||
return
|
||||
return nil
|
||||
}
|
||||
|
||||
// IsRunning returns if the service is running
|
||||
|
||||
@@ -125,3 +125,44 @@ func TestSessionServiceStartBiRPC(t *testing.T) {
|
||||
t.Error("expected service to be running after Start()")
|
||||
}
|
||||
}
|
||||
|
||||
func TestSessionServiceRestart(t *testing.T) {
|
||||
cfg := config.NewDefaultCGRConfig()
|
||||
shdChan := utils.NewSyncedChan()
|
||||
filterSChan := make(chan *engine.FilterS, 1)
|
||||
filterSChan <- nil
|
||||
server := cores.NewServer(nil)
|
||||
srvDep := map[string]*sync.WaitGroup{utils.DataDB: new(sync.WaitGroup)}
|
||||
anz := NewAnalyzerService(cfg, server, filterSChan, shdChan, make(chan birpc.ClientConnector, 1), srvDep)
|
||||
db := NewDataDBService(cfg, nil, false, srvDep)
|
||||
engine.NewConnManager(cfg, nil)
|
||||
|
||||
connChan := make(chan birpc.ClientConnector, 1)
|
||||
smg := NewSessionService(cfg, db, server, connChan, nil, anz, srvDep)
|
||||
|
||||
if err := smg.Start(); err != nil {
|
||||
t.Fatalf("first Start() error: %v", err)
|
||||
}
|
||||
t.Cleanup(func() {
|
||||
if smg.IsRunning() {
|
||||
_ = smg.Shutdown()
|
||||
}
|
||||
})
|
||||
if !smg.IsRunning() {
|
||||
t.Fatal("expected service to be running after first Start()")
|
||||
}
|
||||
|
||||
if err := smg.Shutdown(); err != nil {
|
||||
t.Fatalf("Shutdown() error: %v", err)
|
||||
}
|
||||
if smg.IsRunning() {
|
||||
t.Fatal("expected service to be stopped after Shutdown()")
|
||||
}
|
||||
|
||||
if err := smg.Start(); err != nil {
|
||||
t.Fatalf("second Start() error: %v", err)
|
||||
}
|
||||
if !smg.IsRunning() {
|
||||
t.Fatal("expected service to be running after second Start()")
|
||||
}
|
||||
}
|
||||
|
||||
@@ -52,13 +52,12 @@ func NewThresholdService(cfg *config.CGRConfig, dm *DataDBService,
|
||||
// ThresholdService implements Service interface
|
||||
type ThresholdService struct {
|
||||
sync.RWMutex
|
||||
cfg *config.CGRConfig
|
||||
dm *DataDBService
|
||||
cacheS *engine.CacheS
|
||||
filterSChan chan *engine.FilterS
|
||||
server *cores.Server
|
||||
birpcEnabled bool
|
||||
connMgr *engine.ConnManager
|
||||
cfg *config.CGRConfig
|
||||
dm *DataDBService
|
||||
cacheS *engine.CacheS
|
||||
filterSChan chan *engine.FilterS
|
||||
server *cores.Server
|
||||
connMgr *engine.ConnManager
|
||||
|
||||
thrs *engine.ThresholdService
|
||||
connChan chan birpc.ClientConnector
|
||||
@@ -99,7 +98,6 @@ func (thrs *ThresholdService) Start() error {
|
||||
thrs.connChan <- thrs.anz.GetInternalCodec(srv, utils.ThresholdS)
|
||||
// Register BiRpc handlers
|
||||
if thrs.cfg.ListenCfg().BiJSONListen != "" {
|
||||
thrs.birpcEnabled = true
|
||||
thrs.server.BiRPCRegisterName(utils.ThresholdSv1, srv)
|
||||
}
|
||||
return nil
|
||||
@@ -119,13 +117,12 @@ func (thrs *ThresholdService) Shutdown() (err error) {
|
||||
thrs.Lock()
|
||||
defer thrs.Unlock()
|
||||
thrs.thrs.Shutdown()
|
||||
if thrs.birpcEnabled {
|
||||
thrs.server.StopBiRPC()
|
||||
thrs.birpcEnabled = false
|
||||
if thrs.cfg.ListenCfg().BiJSONListen != "" {
|
||||
thrs.server.BiRPCUnregisterName(utils.ThresholdSv1)
|
||||
}
|
||||
thrs.thrs = nil
|
||||
<-thrs.connChan
|
||||
return
|
||||
return nil
|
||||
}
|
||||
|
||||
// IsRunning returns if the service is running
|
||||
|
||||
@@ -109,3 +109,50 @@ func TestThresholdServiceStartBiRPC(t *testing.T) {
|
||||
t.Error("expected service to be running after Start()")
|
||||
}
|
||||
}
|
||||
|
||||
func TestThresholdServiceRestart(t *testing.T) {
|
||||
cfg := config.NewDefaultCGRConfig()
|
||||
cfg.ThresholdSCfg().Enabled = true
|
||||
filterSChan := make(chan *engine.FilterS, 1)
|
||||
filterSChan <- nil
|
||||
shdChan := utils.NewSyncedChan()
|
||||
chS := engine.NewCacheS(cfg, nil, nil)
|
||||
close(chS.GetPrecacheChannel(utils.CacheThresholdProfiles))
|
||||
close(chS.GetPrecacheChannel(utils.CacheThresholds))
|
||||
close(chS.GetPrecacheChannel(utils.CacheThresholdFilterIndexes))
|
||||
server := cores.NewServer(nil)
|
||||
srvDep := map[string]*sync.WaitGroup{utils.DataDB: new(sync.WaitGroup)}
|
||||
anz := NewAnalyzerService(cfg, server, filterSChan, shdChan, make(chan birpc.ClientConnector, 1), srvDep)
|
||||
db := NewDataDBService(cfg, nil, false, srvDep)
|
||||
db.GetDMChan() <- nil
|
||||
engine.NewConnManager(cfg, nil)
|
||||
|
||||
connChan := make(chan birpc.ClientConnector, 1)
|
||||
tS := NewThresholdService(cfg, db, chS, filterSChan, server, connChan, nil, anz, srvDep)
|
||||
|
||||
if err := tS.Start(); err != nil {
|
||||
t.Fatalf("first Start() error: %v", err)
|
||||
}
|
||||
t.Cleanup(func() {
|
||||
if tS.IsRunning() {
|
||||
_ = tS.Shutdown()
|
||||
}
|
||||
})
|
||||
if !tS.IsRunning() {
|
||||
t.Fatal("expected service to be running after first Start()")
|
||||
}
|
||||
|
||||
if err := tS.Shutdown(); err != nil {
|
||||
t.Fatalf("Shutdown() error: %v", err)
|
||||
}
|
||||
if tS.IsRunning() {
|
||||
t.Fatal("expected service to be stopped after Shutdown()")
|
||||
}
|
||||
|
||||
if err := tS.Start(); err != nil {
|
||||
t.Fatalf("second Start() error: %v", err)
|
||||
}
|
||||
if !tS.IsRunning() {
|
||||
t.Fatal("expected service to be running after second Start()")
|
||||
}
|
||||
}
|
||||
|
||||
Reference in New Issue
Block a user