fix birpc deadlock on startup

startBiRPC blocked waiting for services that hadn't started yet.
Now uses closures that check through the wrapper lock instead.
This commit is contained in:
ionutboangiu
2026-02-17 17:29:03 +02:00
committed by Dan Christian Bogos
parent f92023b49b
commit 7babf7b725
6 changed files with 134 additions and 137 deletions

View File

@@ -232,27 +232,40 @@ func startRPC(server *cores.Server, internalRaterChan,
func startBiRPC(smg *SessionService, tS *ThresholdService, server *cores.Server,
shdChan *utils.SyncedChan) {
var onConns []func(c birpc.ClientConnector)
var onDiss []func(c birpc.ClientConnector)
// wait for conn funcs to be populated only if service should run and BiRPC is populated
if smg.ShouldRun() {
onConn, onDisconn := smg.GetSessionSOnBiJSONFuncs()
onConns = append(onConns, onConn)
onDiss = append(onDiss, onDisconn)
onConns := []func(c birpc.ClientConnector){
func(c birpc.ClientConnector) {
smg.RLock()
defer smg.RUnlock()
if smg.sm != nil {
smg.sm.OnBiJSONConnect(c)
}
},
func(c birpc.ClientConnector) {
tS.RLock()
defer tS.RUnlock()
if tS.thrs != nil {
tS.thrs.OnBiJSONConnect(c)
}
},
}
if tS.ShouldRun() {
onConn, onDisconn := tS.GetThresholdSOnBiJSONFuncs()
onConns = append(onConns, onConn)
onDiss = append(onDiss, onDisconn)
onDiss := []func(c birpc.ClientConnector){
func(c birpc.ClientConnector) {
smg.RLock()
defer smg.RUnlock()
if smg.sm != nil {
smg.sm.OnBiJSONDisconnect(c)
}
},
func(c birpc.ClientConnector) {
tS.RLock()
defer tS.RUnlock()
if tS.thrs != nil {
tS.thrs.OnBiJSONDisconnect(c)
}
},
}
if err := server.ServeBiRPC(cfg.ListenCfg().BiJSONListen, cfg.ListenCfg().BiGobListen, onConns, onDiss); err != nil {
utils.Logger.Err(fmt.Sprintf("<%s> serve BiRPC error: %s!", utils.SessionS, err))
if smg.ShouldRun() {
smg.DisableBiRPC()
}
if tS.ShouldRun() {
tS.DisableBiRPC()
}
shdChan.CloseOnce()
}
}

View File

@@ -32,39 +32,19 @@ import (
"github.com/cgrates/cgrates/utils"
)
type BiRPCSessionSFuncs struct {
sessionSOnBiJSONConnect func(c birpc.ClientConnector) //store OnBiJSONConnect
sessionSOnBiJSONDisconnect func(c birpc.ClientConnector) //store OnBiJSONDisconnect
}
// Returns a new BiRPCSessionSFuncs struct
func NewSessionSBiJSONFuncs(onConn, onDisconn func(c birpc.ClientConnector)) *BiRPCSessionSFuncs {
return &BiRPCSessionSFuncs{
sessionSOnBiJSONConnect: onConn,
sessionSOnBiJSONDisconnect: onDisconn,
}
}
// GetSessionSOnBiJSONFuncs returns sessionSOnBiJSONConnect and sessionSOnBiJSONDisconnect (1 time use)
func (smg *SessionService) GetSessionSOnBiJSONFuncs() (onConn, onDisconn func(c birpc.ClientConnector)) {
<-smg.biRPCFuncsDone // Make sure funcs are initialized in the structure before getting them
return smg.biRPCFuncs.sessionSOnBiJSONConnect, smg.biRPCFuncs.sessionSOnBiJSONDisconnect
}
// NewSessionService returns the Session Service
func NewSessionService(cfg *config.CGRConfig, dm *DataDBService,
server *cores.Server, internalChan chan birpc.ClientConnector,
connMgr *engine.ConnManager, anz *AnalyzerService,
srvDep map[string]*sync.WaitGroup) *SessionService {
return &SessionService{
connChan: internalChan,
cfg: cfg,
dm: dm,
server: server,
connMgr: connMgr,
anz: anz,
srvDep: srvDep,
biRPCFuncsDone: make(chan struct{}),
connChan: internalChan,
cfg: cfg,
dm: dm,
server: server,
connMgr: connMgr,
anz: anz,
srvDep: srvDep,
}
}
@@ -79,13 +59,10 @@ type SessionService struct {
sm *sessions.SessionS
connChan chan birpc.ClientConnector
// in order to stop the bircp server if necesary
birpcEnabled bool
biRPCFuncs *BiRPCSessionSFuncs
biRPCFuncsDone chan struct{} // marks when biRPCFuncs are initialized
connMgr *engine.ConnManager
anz *AnalyzerService
srvDep map[string]*sync.WaitGroup
birpcEnabled bool
connMgr *engine.ConnManager
anz *AnalyzerService
srvDep map[string]*sync.WaitGroup
}
// Start should handle the sercive start
@@ -136,21 +113,10 @@ func (smg *SessionService) Start() error {
if smg.cfg.ListenCfg().BiJSONListen != utils.EmptyString {
smg.birpcEnabled = true
smg.server.BiRPCRegisterName(utils.SessionSv1, srv)
smg.biRPCFuncs = NewSessionSBiJSONFuncs(smg.sm.OnBiJSONConnect, smg.sm.OnBiJSONDisconnect)
smg.biRPCFuncsDone <- struct{}{}
}
return nil
}
func (smg *SessionService) DisableBiRPC() {
if smg == nil {
return
}
smg.Lock()
smg.birpcEnabled = false
smg.Unlock()
}
// Reload handles the change of config
func (smg *SessionService) Reload() (err error) {
return

View File

@@ -21,6 +21,7 @@ import (
"reflect"
"sync"
"testing"
"time"
"github.com/cgrates/birpc"
"github.com/cgrates/cgrates/sessions"
@@ -89,3 +90,38 @@ func TestReload(t *testing.T) {
t.Errorf("Expected no error, got %v", err)
}
}
func TestSessionServiceStartBiRPC(t *testing.T) {
cfg := config.NewDefaultCGRConfig()
// Default config already has BiJSONListen set.
shdChan := utils.NewSyncedChan()
filterSChan := make(chan *engine.FilterS, 1)
filterSChan <- nil
server := cores.NewServer(nil)
srvDep := map[string]*sync.WaitGroup{utils.DataDB: new(sync.WaitGroup)}
anz := NewAnalyzerService(cfg, server, filterSChan, shdChan, make(chan birpc.ClientConnector, 1), srvDep)
db := NewDataDBService(cfg, nil, false, srvDep)
engine.NewConnManager(cfg, nil)
smg := NewSessionService(cfg, db, server, make(chan birpc.ClientConnector, 1), nil, anz, srvDep)
done := make(chan error, 1)
go func() { done <- smg.Start() }()
t.Cleanup(func() {
if smg.IsRunning() {
_ = smg.Shutdown()
}
})
select {
case err := <-done:
if err != nil {
t.Fatalf("Start() returned error: %v", err)
}
case <-time.After(5 * time.Second):
t.Fatal("Start() blocked for 5s, likely deadlock")
}
if !smg.IsRunning() {
t.Error("expected service to be running after Start()")
}
}

View File

@@ -30,25 +30,6 @@ import (
"github.com/cgrates/cgrates/utils"
)
type BiRPCThresholdSFuncs struct {
thresholdSOnBiJSONConnect func(c birpc.ClientConnector) //store OnBiJSONConnect
thresholdSOnBiJSONDisconnect func(c birpc.ClientConnector) //store OnBiJSONDisconnect
}
// Returns a new BiRPCThresholdSFuncs struct
func NewThresholdSBiJSONFuncs(onConn, onDisconn func(c birpc.ClientConnector)) *BiRPCThresholdSFuncs {
return &BiRPCThresholdSFuncs{
thresholdSOnBiJSONConnect: onConn,
thresholdSOnBiJSONDisconnect: onDisconn,
}
}
// GetThresholdSOnBiJSONFuncs returns ThresholdSOnBiJSONConnect and ThresholdSOnBiJSONDisconnect (1 time use)
func (thrs *ThresholdService) GetThresholdSOnBiJSONFuncs() (onConn, onDisconn func(c birpc.ClientConnector)) {
<-thrs.biRPCFuncsDone // Make sure funcs are initialized in the structure before getting them
return thrs.biRPCFuncs.thresholdSOnBiJSONConnect, thrs.biRPCFuncs.thresholdSOnBiJSONDisconnect
}
// NewThresholdService returns the Threshold Service
func NewThresholdService(cfg *config.CGRConfig, dm *DataDBService,
cacheS *engine.CacheS, filterSChan chan *engine.FilterS,
@@ -56,31 +37,28 @@ func NewThresholdService(cfg *config.CGRConfig, dm *DataDBService,
connMgr *engine.ConnManager, anz *AnalyzerService,
srvDep map[string]*sync.WaitGroup) *ThresholdService {
return &ThresholdService{
connChan: internalThresholdSChan,
cfg: cfg,
dm: dm,
cacheS: cacheS,
filterSChan: filterSChan,
server: server,
connMgr: connMgr,
anz: anz,
srvDep: srvDep,
biRPCFuncsDone: make(chan struct{}),
connChan: internalThresholdSChan,
cfg: cfg,
dm: dm,
cacheS: cacheS,
filterSChan: filterSChan,
server: server,
connMgr: connMgr,
anz: anz,
srvDep: srvDep,
}
}
// ThresholdService implements Service interface
type ThresholdService struct {
sync.RWMutex
cfg *config.CGRConfig
dm *DataDBService
cacheS *engine.CacheS
filterSChan chan *engine.FilterS
server *cores.Server
birpcEnabled bool
biRPCFuncs *BiRPCThresholdSFuncs
biRPCFuncsDone chan struct{} // marks when biRPCFuncs are initialized
connMgr *engine.ConnManager
cfg *config.CGRConfig
dm *DataDBService
cacheS *engine.CacheS
filterSChan chan *engine.FilterS
server *cores.Server
birpcEnabled bool
connMgr *engine.ConnManager
thrs *engine.ThresholdService
connChan chan birpc.ClientConnector
@@ -123,21 +101,10 @@ func (thrs *ThresholdService) Start() error {
if thrs.cfg.ListenCfg().BiJSONListen != "" {
thrs.birpcEnabled = true
thrs.server.BiRPCRegisterName(utils.ThresholdSv1, srv)
thrs.biRPCFuncs = NewThresholdSBiJSONFuncs(thrs.thrs.OnBiJSONConnect, thrs.thrs.OnBiJSONDisconnect)
thrs.biRPCFuncsDone <- struct{}{}
}
return nil
}
func (thrs *ThresholdService) DisableBiRPC() {
if thrs == nil {
return
}
thrs.Lock()
thrs.birpcEnabled = false
thrs.Unlock()
}
// Reload handles the change of config
func (thrs *ThresholdService) Reload() (err error) {
thrs.Lock()

View File

@@ -66,18 +66,6 @@ func TestThresholdSReload(t *testing.T) {
if db.IsRunning() {
t.Errorf("Expected service to be down")
}
go func() { // simulate cgr-engine starting birpc serve
var onConns []func(c birpc.ClientConnector)
var onDiss []func(c birpc.ClientConnector)
if tS.ShouldRun() {
onConn, onDisconn := tS.GetThresholdSOnBiJSONFuncs()
onConns = append(onConns, onConn)
onDiss = append(onDiss, onDisconn)
}
if err := server.ServeBiRPC(cfg.ListenCfg().BiJSONListen, cfg.ListenCfg().BiGobListen, onConns, onDiss); err != nil {
t.Error(err)
}
}()
var reply string
if err := cfg.V1ReloadConfig(context.Background(),
&config.ReloadArgs{
@@ -114,7 +102,6 @@ func TestThresholdSReload(t *testing.T) {
}
shdChan.CloseOnce()
time.Sleep(10 * time.Millisecond)
tS.server.StopBiRPC() // needed when running tests in bulk
}
func TestThresholdSReload2(t *testing.T) {
cfg := config.NewDefaultCGRConfig()
@@ -146,18 +133,6 @@ func TestThresholdSReload2(t *testing.T) {
if db.IsRunning() {
t.Errorf("Expected service to be down")
}
go func() { // simulate cgr-engine starting birpc serve
var onConns []func(c birpc.ClientConnector)
var onDiss []func(c birpc.ClientConnector)
if tS.ShouldRun() {
onConn, onDisconn := tS.GetThresholdSOnBiJSONFuncs()
onConns = append(onConns, onConn)
onDiss = append(onDiss, onDisconn)
}
if err := server.ServeBiRPC(cfg.ListenCfg().BiJSONListen, cfg.ListenCfg().BiGobListen, onConns, onDiss); err != nil {
t.Error(err)
}
}()
var reply string
if err := cfg.V1ReloadConfig(context.Background(),
&config.ReloadArgs{
@@ -191,5 +166,4 @@ func TestThresholdSReload2(t *testing.T) {
}
shdChan.CloseOnce()
time.Sleep(10 * time.Millisecond)
tS.server.StopBiRPC() // needed when running tests in bulk
}

View File

@@ -21,6 +21,7 @@ import (
"reflect"
"sync"
"testing"
"time"
"github.com/cgrates/birpc"
"github.com/cgrates/cgrates/config"
@@ -68,3 +69,43 @@ func TestThresholdSCoverage(t *testing.T) {
t.Errorf("\nExpecting <false>,\n Received <%+v>", shouldRun)
}
}
func TestThresholdServiceStartBiRPC(t *testing.T) {
cfg := config.NewDefaultCGRConfig()
cfg.ThresholdSCfg().Enabled = true
filterSChan := make(chan *engine.FilterS, 1)
filterSChan <- nil
shdChan := utils.NewSyncedChan()
chS := engine.NewCacheS(cfg, nil, nil)
close(chS.GetPrecacheChannel(utils.CacheThresholdProfiles))
close(chS.GetPrecacheChannel(utils.CacheThresholds))
close(chS.GetPrecacheChannel(utils.CacheThresholdFilterIndexes))
server := cores.NewServer(nil)
srvDep := map[string]*sync.WaitGroup{utils.DataDB: new(sync.WaitGroup)}
anz := NewAnalyzerService(cfg, server, filterSChan, shdChan, make(chan birpc.ClientConnector, 1), srvDep)
db := NewDataDBService(cfg, nil, false, srvDep)
db.GetDMChan() <- nil
engine.NewConnManager(cfg, nil)
tS := NewThresholdService(cfg, db, chS, filterSChan, server, make(chan birpc.ClientConnector, 1), nil, anz, srvDep)
done := make(chan error, 1)
go func() { done <- tS.Start() }()
t.Cleanup(func() {
if tS.IsRunning() {
_ = tS.Shutdown()
}
})
select {
case err := <-done:
if err != nil {
t.Fatalf("Start() returned error: %v", err)
}
case <-time.After(5 * time.Second):
t.Fatal("Start() blocked for 5s, likely deadlock")
}
if !tS.IsRunning() {
t.Error("expected service to be running after Start()")
}
}