diff --git a/services/engine.go b/services/engine.go index 49f19662b..97ec1d732 100644 --- a/services/engine.go +++ b/services/engine.go @@ -232,27 +232,40 @@ func startRPC(server *cores.Server, internalRaterChan, func startBiRPC(smg *SessionService, tS *ThresholdService, server *cores.Server, shdChan *utils.SyncedChan) { - var onConns []func(c birpc.ClientConnector) - var onDiss []func(c birpc.ClientConnector) - // wait for conn funcs to be populated only if service should run and BiRPC is populated - if smg.ShouldRun() { - onConn, onDisconn := smg.GetSessionSOnBiJSONFuncs() - onConns = append(onConns, onConn) - onDiss = append(onDiss, onDisconn) + onConns := []func(c birpc.ClientConnector){ + func(c birpc.ClientConnector) { + smg.RLock() + defer smg.RUnlock() + if smg.sm != nil { + smg.sm.OnBiJSONConnect(c) + } + }, + func(c birpc.ClientConnector) { + tS.RLock() + defer tS.RUnlock() + if tS.thrs != nil { + tS.thrs.OnBiJSONConnect(c) + } + }, } - if tS.ShouldRun() { - onConn, onDisconn := tS.GetThresholdSOnBiJSONFuncs() - onConns = append(onConns, onConn) - onDiss = append(onDiss, onDisconn) + onDiss := []func(c birpc.ClientConnector){ + func(c birpc.ClientConnector) { + smg.RLock() + defer smg.RUnlock() + if smg.sm != nil { + smg.sm.OnBiJSONDisconnect(c) + } + }, + func(c birpc.ClientConnector) { + tS.RLock() + defer tS.RUnlock() + if tS.thrs != nil { + tS.thrs.OnBiJSONDisconnect(c) + } + }, } if err := server.ServeBiRPC(cfg.ListenCfg().BiJSONListen, cfg.ListenCfg().BiGobListen, onConns, onDiss); err != nil { utils.Logger.Err(fmt.Sprintf("<%s> serve BiRPC error: %s!", utils.SessionS, err)) - if smg.ShouldRun() { - smg.DisableBiRPC() - } - if tS.ShouldRun() { - tS.DisableBiRPC() - } shdChan.CloseOnce() } } diff --git a/services/sessions.go b/services/sessions.go index 85520eea8..2346cf302 100644 --- a/services/sessions.go +++ b/services/sessions.go @@ -32,39 +32,19 @@ import ( "github.com/cgrates/cgrates/utils" ) -type BiRPCSessionSFuncs struct { - sessionSOnBiJSONConnect func(c birpc.ClientConnector) //store OnBiJSONConnect - sessionSOnBiJSONDisconnect func(c birpc.ClientConnector) //store OnBiJSONDisconnect -} - -// Returns a new BiRPCSessionSFuncs struct -func NewSessionSBiJSONFuncs(onConn, onDisconn func(c birpc.ClientConnector)) *BiRPCSessionSFuncs { - return &BiRPCSessionSFuncs{ - sessionSOnBiJSONConnect: onConn, - sessionSOnBiJSONDisconnect: onDisconn, - } -} - -// GetSessionSOnBiJSONFuncs returns sessionSOnBiJSONConnect and sessionSOnBiJSONDisconnect (1 time use) -func (smg *SessionService) GetSessionSOnBiJSONFuncs() (onConn, onDisconn func(c birpc.ClientConnector)) { - <-smg.biRPCFuncsDone // Make sure funcs are initialized in the structure before getting them - return smg.biRPCFuncs.sessionSOnBiJSONConnect, smg.biRPCFuncs.sessionSOnBiJSONDisconnect -} - // NewSessionService returns the Session Service func NewSessionService(cfg *config.CGRConfig, dm *DataDBService, server *cores.Server, internalChan chan birpc.ClientConnector, connMgr *engine.ConnManager, anz *AnalyzerService, srvDep map[string]*sync.WaitGroup) *SessionService { return &SessionService{ - connChan: internalChan, - cfg: cfg, - dm: dm, - server: server, - connMgr: connMgr, - anz: anz, - srvDep: srvDep, - biRPCFuncsDone: make(chan struct{}), + connChan: internalChan, + cfg: cfg, + dm: dm, + server: server, + connMgr: connMgr, + anz: anz, + srvDep: srvDep, } } @@ -79,13 +59,10 @@ type SessionService struct { sm *sessions.SessionS connChan chan birpc.ClientConnector - // in order to stop the bircp server if necesary - birpcEnabled bool - biRPCFuncs *BiRPCSessionSFuncs - biRPCFuncsDone chan struct{} // marks when biRPCFuncs are initialized - connMgr *engine.ConnManager - anz *AnalyzerService - srvDep map[string]*sync.WaitGroup + birpcEnabled bool + connMgr *engine.ConnManager + anz *AnalyzerService + srvDep map[string]*sync.WaitGroup } // Start should handle the sercive start @@ -136,21 +113,10 @@ func (smg *SessionService) Start() error { if smg.cfg.ListenCfg().BiJSONListen != utils.EmptyString { smg.birpcEnabled = true smg.server.BiRPCRegisterName(utils.SessionSv1, srv) - smg.biRPCFuncs = NewSessionSBiJSONFuncs(smg.sm.OnBiJSONConnect, smg.sm.OnBiJSONDisconnect) - smg.biRPCFuncsDone <- struct{}{} } return nil } -func (smg *SessionService) DisableBiRPC() { - if smg == nil { - return - } - smg.Lock() - smg.birpcEnabled = false - smg.Unlock() -} - // Reload handles the change of config func (smg *SessionService) Reload() (err error) { return diff --git a/services/sessions_test.go b/services/sessions_test.go index 2ffc230e7..d6bcc79b3 100644 --- a/services/sessions_test.go +++ b/services/sessions_test.go @@ -21,6 +21,7 @@ import ( "reflect" "sync" "testing" + "time" "github.com/cgrates/birpc" "github.com/cgrates/cgrates/sessions" @@ -89,3 +90,38 @@ func TestReload(t *testing.T) { t.Errorf("Expected no error, got %v", err) } } + +func TestSessionServiceStartBiRPC(t *testing.T) { + cfg := config.NewDefaultCGRConfig() + // Default config already has BiJSONListen set. + shdChan := utils.NewSyncedChan() + filterSChan := make(chan *engine.FilterS, 1) + filterSChan <- nil + server := cores.NewServer(nil) + srvDep := map[string]*sync.WaitGroup{utils.DataDB: new(sync.WaitGroup)} + anz := NewAnalyzerService(cfg, server, filterSChan, shdChan, make(chan birpc.ClientConnector, 1), srvDep) + db := NewDataDBService(cfg, nil, false, srvDep) + engine.NewConnManager(cfg, nil) + + smg := NewSessionService(cfg, db, server, make(chan birpc.ClientConnector, 1), nil, anz, srvDep) + + done := make(chan error, 1) + go func() { done <- smg.Start() }() + t.Cleanup(func() { + if smg.IsRunning() { + _ = smg.Shutdown() + } + }) + + select { + case err := <-done: + if err != nil { + t.Fatalf("Start() returned error: %v", err) + } + case <-time.After(5 * time.Second): + t.Fatal("Start() blocked for 5s, likely deadlock") + } + if !smg.IsRunning() { + t.Error("expected service to be running after Start()") + } +} diff --git a/services/thresholds.go b/services/thresholds.go index 58b8539e0..fba3f3d68 100644 --- a/services/thresholds.go +++ b/services/thresholds.go @@ -30,25 +30,6 @@ import ( "github.com/cgrates/cgrates/utils" ) -type BiRPCThresholdSFuncs struct { - thresholdSOnBiJSONConnect func(c birpc.ClientConnector) //store OnBiJSONConnect - thresholdSOnBiJSONDisconnect func(c birpc.ClientConnector) //store OnBiJSONDisconnect -} - -// Returns a new BiRPCThresholdSFuncs struct -func NewThresholdSBiJSONFuncs(onConn, onDisconn func(c birpc.ClientConnector)) *BiRPCThresholdSFuncs { - return &BiRPCThresholdSFuncs{ - thresholdSOnBiJSONConnect: onConn, - thresholdSOnBiJSONDisconnect: onDisconn, - } -} - -// GetThresholdSOnBiJSONFuncs returns ThresholdSOnBiJSONConnect and ThresholdSOnBiJSONDisconnect (1 time use) -func (thrs *ThresholdService) GetThresholdSOnBiJSONFuncs() (onConn, onDisconn func(c birpc.ClientConnector)) { - <-thrs.biRPCFuncsDone // Make sure funcs are initialized in the structure before getting them - return thrs.biRPCFuncs.thresholdSOnBiJSONConnect, thrs.biRPCFuncs.thresholdSOnBiJSONDisconnect -} - // NewThresholdService returns the Threshold Service func NewThresholdService(cfg *config.CGRConfig, dm *DataDBService, cacheS *engine.CacheS, filterSChan chan *engine.FilterS, @@ -56,31 +37,28 @@ func NewThresholdService(cfg *config.CGRConfig, dm *DataDBService, connMgr *engine.ConnManager, anz *AnalyzerService, srvDep map[string]*sync.WaitGroup) *ThresholdService { return &ThresholdService{ - connChan: internalThresholdSChan, - cfg: cfg, - dm: dm, - cacheS: cacheS, - filterSChan: filterSChan, - server: server, - connMgr: connMgr, - anz: anz, - srvDep: srvDep, - biRPCFuncsDone: make(chan struct{}), + connChan: internalThresholdSChan, + cfg: cfg, + dm: dm, + cacheS: cacheS, + filterSChan: filterSChan, + server: server, + connMgr: connMgr, + anz: anz, + srvDep: srvDep, } } // ThresholdService implements Service interface type ThresholdService struct { sync.RWMutex - cfg *config.CGRConfig - dm *DataDBService - cacheS *engine.CacheS - filterSChan chan *engine.FilterS - server *cores.Server - birpcEnabled bool - biRPCFuncs *BiRPCThresholdSFuncs - biRPCFuncsDone chan struct{} // marks when biRPCFuncs are initialized - connMgr *engine.ConnManager + cfg *config.CGRConfig + dm *DataDBService + cacheS *engine.CacheS + filterSChan chan *engine.FilterS + server *cores.Server + birpcEnabled bool + connMgr *engine.ConnManager thrs *engine.ThresholdService connChan chan birpc.ClientConnector @@ -123,21 +101,10 @@ func (thrs *ThresholdService) Start() error { if thrs.cfg.ListenCfg().BiJSONListen != "" { thrs.birpcEnabled = true thrs.server.BiRPCRegisterName(utils.ThresholdSv1, srv) - thrs.biRPCFuncs = NewThresholdSBiJSONFuncs(thrs.thrs.OnBiJSONConnect, thrs.thrs.OnBiJSONDisconnect) - thrs.biRPCFuncsDone <- struct{}{} } return nil } -func (thrs *ThresholdService) DisableBiRPC() { - if thrs == nil { - return - } - thrs.Lock() - thrs.birpcEnabled = false - thrs.Unlock() -} - // Reload handles the change of config func (thrs *ThresholdService) Reload() (err error) { thrs.Lock() diff --git a/services/thresholds_it_test.go b/services/thresholds_it_test.go index 31c29567e..eb6978066 100644 --- a/services/thresholds_it_test.go +++ b/services/thresholds_it_test.go @@ -66,18 +66,6 @@ func TestThresholdSReload(t *testing.T) { if db.IsRunning() { t.Errorf("Expected service to be down") } - go func() { // simulate cgr-engine starting birpc serve - var onConns []func(c birpc.ClientConnector) - var onDiss []func(c birpc.ClientConnector) - if tS.ShouldRun() { - onConn, onDisconn := tS.GetThresholdSOnBiJSONFuncs() - onConns = append(onConns, onConn) - onDiss = append(onDiss, onDisconn) - } - if err := server.ServeBiRPC(cfg.ListenCfg().BiJSONListen, cfg.ListenCfg().BiGobListen, onConns, onDiss); err != nil { - t.Error(err) - } - }() var reply string if err := cfg.V1ReloadConfig(context.Background(), &config.ReloadArgs{ @@ -114,7 +102,6 @@ func TestThresholdSReload(t *testing.T) { } shdChan.CloseOnce() time.Sleep(10 * time.Millisecond) - tS.server.StopBiRPC() // needed when running tests in bulk } func TestThresholdSReload2(t *testing.T) { cfg := config.NewDefaultCGRConfig() @@ -146,18 +133,6 @@ func TestThresholdSReload2(t *testing.T) { if db.IsRunning() { t.Errorf("Expected service to be down") } - go func() { // simulate cgr-engine starting birpc serve - var onConns []func(c birpc.ClientConnector) - var onDiss []func(c birpc.ClientConnector) - if tS.ShouldRun() { - onConn, onDisconn := tS.GetThresholdSOnBiJSONFuncs() - onConns = append(onConns, onConn) - onDiss = append(onDiss, onDisconn) - } - if err := server.ServeBiRPC(cfg.ListenCfg().BiJSONListen, cfg.ListenCfg().BiGobListen, onConns, onDiss); err != nil { - t.Error(err) - } - }() var reply string if err := cfg.V1ReloadConfig(context.Background(), &config.ReloadArgs{ @@ -191,5 +166,4 @@ func TestThresholdSReload2(t *testing.T) { } shdChan.CloseOnce() time.Sleep(10 * time.Millisecond) - tS.server.StopBiRPC() // needed when running tests in bulk } diff --git a/services/thresholds_test.go b/services/thresholds_test.go index 1b7b694c4..5a1f267cc 100644 --- a/services/thresholds_test.go +++ b/services/thresholds_test.go @@ -21,6 +21,7 @@ import ( "reflect" "sync" "testing" + "time" "github.com/cgrates/birpc" "github.com/cgrates/cgrates/config" @@ -68,3 +69,43 @@ func TestThresholdSCoverage(t *testing.T) { t.Errorf("\nExpecting ,\n Received <%+v>", shouldRun) } } + +func TestThresholdServiceStartBiRPC(t *testing.T) { + cfg := config.NewDefaultCGRConfig() + cfg.ThresholdSCfg().Enabled = true + filterSChan := make(chan *engine.FilterS, 1) + filterSChan <- nil + shdChan := utils.NewSyncedChan() + chS := engine.NewCacheS(cfg, nil, nil) + close(chS.GetPrecacheChannel(utils.CacheThresholdProfiles)) + close(chS.GetPrecacheChannel(utils.CacheThresholds)) + close(chS.GetPrecacheChannel(utils.CacheThresholdFilterIndexes)) + server := cores.NewServer(nil) + srvDep := map[string]*sync.WaitGroup{utils.DataDB: new(sync.WaitGroup)} + anz := NewAnalyzerService(cfg, server, filterSChan, shdChan, make(chan birpc.ClientConnector, 1), srvDep) + db := NewDataDBService(cfg, nil, false, srvDep) + db.GetDMChan() <- nil + engine.NewConnManager(cfg, nil) + + tS := NewThresholdService(cfg, db, chS, filterSChan, server, make(chan birpc.ClientConnector, 1), nil, anz, srvDep) + + done := make(chan error, 1) + go func() { done <- tS.Start() }() + t.Cleanup(func() { + if tS.IsRunning() { + _ = tS.Shutdown() + } + }) + + select { + case err := <-done: + if err != nil { + t.Fatalf("Start() returned error: %v", err) + } + case <-time.After(5 * time.Second): + t.Fatal("Start() blocked for 5s, likely deadlock") + } + if !tS.IsRunning() { + t.Error("expected service to be running after Start()") + } +}