diff --git a/agents/sipagent.go b/agents/sipagent.go
index 40978ca0c..cfb8faaab 100644
--- a/agents/sipagent.go
+++ b/agents/sipagent.go
@@ -52,10 +52,11 @@ var (
func NewSIPAgent(connMgr *engine.ConnManager, cfg *config.CGRConfig,
filterS *engine.FilterS) (sa *SIPAgent, err error) {
sa = &SIPAgent{
- connMgr: connMgr,
- filterS: filterS,
- cfg: cfg,
- ackMap: make(map[string]chan struct{}),
+ connMgr: connMgr,
+ filterS: filterS,
+ cfg: cfg,
+ ackMap: make(map[string]chan struct{}),
+ stopChan: make(chan struct{}),
}
msgTemplates := sa.cfg.TemplatesCfg()
// Inflate *template field types
@@ -96,7 +97,6 @@ func (sa *SIPAgent) Shutdown() {
// ListenAndServe will run the SIP handler doing also the connection to listen address
func (sa *SIPAgent) ListenAndServe() (err error) {
- sa.stopChan = make(chan struct{})
utils.Logger.Info(fmt.Sprintf("<%s> start listening on <%s:%s>",
utils.SIPAgent, sa.cfg.SIPAgentCfg().ListenNet, sa.cfg.SIPAgentCfg().Listen))
switch sa.cfg.SIPAgentCfg().ListenNet {
@@ -108,6 +108,9 @@ func (sa *SIPAgent) ListenAndServe() (err error) {
return fmt.Errorf("Unecepected protocol %s", sa.cfg.SIPAgentCfg().ListenNet)
}
}
+func (sa *SIPAgent) InitStopChan() {
+ sa.stopChan = make(chan struct{})
+}
func (sa *SIPAgent) serveUDP(stop chan struct{}) (err error) {
var conn net.PacketConn
diff --git a/services/sessions_it_test.go b/services/sessions_it_test.go
index 2a23b247e..19193f0fc 100644
--- a/services/sessions_it_test.go
+++ b/services/sessions_it_test.go
@@ -19,7 +19,21 @@ along with this program. If not, see
*/
package services
-/*
+import (
+ "path"
+ "sync"
+ "testing"
+ "time"
+
+ "github.com/cgrates/cgrates/config"
+ "github.com/cgrates/cgrates/cores"
+ "github.com/cgrates/cgrates/engine"
+ "github.com/cgrates/cgrates/servmanager"
+ "github.com/cgrates/cgrates/sessions"
+ "github.com/cgrates/cgrates/utils"
+ "github.com/cgrates/rpcclient"
+)
+
func TestSessionSReload(t *testing.T) {
cfg := config.NewDefaultCGRConfig()
@@ -84,48 +98,98 @@ func TestSessionSReload(t *testing.T) {
t.Errorf("Expected service to be down")
}
-
- var reply string
- if err := cfg.V1ReloadConfig(&config.ReloadArgs{
- Path: path.Join("/usr", "share", "cgrates", "conf", "samples", "tutmongonew"),
- Section: config.SessionSJson,
- }, &reply); err != nil {
- t.Error(err)
- } else if reply != utils.OK {
- t.Errorf("Expecting OK ,received %s", reply)
- }
-
- time.Sleep(10 * time.Millisecond) //need to switch to gorutine
- if !srv.IsRunning() {
- t.Errorf("Expected service to be running")
- }
- time.Sleep(10 * time.Millisecond)
- if !db.IsRunning() {
- t.Errorf("Expected service to be running")
- }
-
-
- err := srv.Start()
- if err == nil || err != utils.ErrServiceAlreadyRunning {
- t.Errorf("\nExpecting <%+v>,\n Received <%+v>", utils.ErrServiceAlreadyRunning, err)
+ var reply string
+ if err := cfg.V1ReloadConfig(&config.ReloadArgs{
+ Path: path.Join("/usr", "share", "cgrates", "conf", "samples", "tutmongonew"),
+ Section: config.SessionSJson,
+ }, &reply); err != nil {
+ t.Error(err)
+ } else if reply != utils.OK {
+ t.Errorf("Expecting OK ,received %s", reply)
}
- srv.Shutdown()
- srv.Start()
+ time.Sleep(10 * time.Millisecond) //need to switch to gorutine
+ if !srv.IsRunning() {
+ t.Errorf("Expected service to be running")
+ }
- err = srv.Reload()
- if err != nil {
- t.Errorf("\nExpecting ,\n Received <%+v>", err)
- }
- cfg.SessionSCfg().Enabled = false
- cfg.GetReloadChan(config.SessionSJson) <- struct{}{}
- time.Sleep(10 * time.Millisecond)
- if srv.IsRunning() {
- t.Errorf("Expected service to be down")
- }
- shdChan.CloseOnce()
- time.Sleep(10 * time.Millisecond)
+ err := srv.Reload()
+ if err != nil {
+ t.Errorf("\nExpecting ,\n Received <%+v>", err)
+ }
+ srv.(*SessionService).sm = &sessions.SessionS{}
+ err = srv.Shutdown()
+ if err != nil {
+ t.Fatal(err)
+ }
+
+ cfg.SessionSCfg().Enabled = false
+ cfg.GetReloadChan(config.SessionSJson) <- struct{}{}
+ time.Sleep(10 * time.Millisecond)
+ if srv.IsRunning() {
+ t.Errorf("Expected service to be down")
+ }
+ shdChan.CloseOnce()
+ time.Sleep(10 * time.Millisecond)
+
+}
+func TestSessionSReload2(t *testing.T) {
+ cfg := config.NewDefaultCGRConfig()
+
+ cfg.ChargerSCfg().Enabled = true
+ cfg.RalsCfg().Enabled = true
+ cfg.CdrsCfg().Enabled = true
+ utils.Logger, _ = utils.Newlogger(utils.MetaSysLog, cfg.GeneralCfg().NodeID)
+ utils.Logger.SetLogLevel(7)
+ filterSChan := make(chan *engine.FilterS, 1)
+ filterSChan <- nil
+ shdChan := utils.NewSyncedChan()
+ chS := engine.NewCacheS(cfg, nil, nil)
+ close(chS.GetPrecacheChannel(utils.CacheChargerProfiles))
+ close(chS.GetPrecacheChannel(utils.CacheChargerFilterIndexes))
+ close(chS.GetPrecacheChannel(utils.CacheDestinations))
+ close(chS.GetPrecacheChannel(utils.CacheReverseDestinations))
+ close(chS.GetPrecacheChannel(utils.CacheRatingPlans))
+ close(chS.GetPrecacheChannel(utils.CacheRatingProfiles))
+ close(chS.GetPrecacheChannel(utils.CacheActions))
+ close(chS.GetPrecacheChannel(utils.CacheActionPlans))
+ close(chS.GetPrecacheChannel(utils.CacheAccountActionPlans))
+ close(chS.GetPrecacheChannel(utils.CacheActionTriggers))
+ close(chS.GetPrecacheChannel(utils.CacheSharedGroups))
+ close(chS.GetPrecacheChannel(utils.CacheTimings))
+
+ internalChan := make(chan rpcclient.ClientConnector, 1)
+ internalChan <- nil
+ cacheSChan := make(chan rpcclient.ClientConnector, 1)
+ cacheSChan <- chS
+
+ server := cores.NewServer(nil)
+
+ srvDep := map[string]*sync.WaitGroup{utils.DataDB: new(sync.WaitGroup)}
+ db := NewDataDBService(cfg, nil, srvDep)
+ cfg.StorDbCfg().Type = utils.INTERNAL
+ anz := NewAnalyzerService(cfg, server, filterSChan, shdChan, make(chan rpcclient.ClientConnector, 1), srvDep)
+ srv := NewSessionService(cfg, db, server, make(chan rpcclient.ClientConnector, 1), shdChan, nil, nil, anz, srvDep)
+ engine.NewConnManager(cfg, nil)
+
+ srv.(*SessionService).sm = &sessions.SessionS{}
+ err := srv.IsRunning()
+ if err != true {
+ t.Errorf("\nExpecting ,\n Received <%+v>", err)
+ }
+ err2 := srv.Start()
+ if err2 != utils.ErrServiceAlreadyRunning {
+ t.Errorf("\nExpecting <%+v>,\n Received <%+v>", utils.ErrServiceAlreadyRunning, err2)
+ }
+ cfg.SessionSCfg().Enabled = false
+ cfg.GetReloadChan(config.SessionSJson) <- struct{}{}
+ time.Sleep(10 * time.Millisecond)
+ srv.(*SessionService).sm = nil
+ if srv.IsRunning() {
+ t.Errorf("Expected service to be down")
+ }
+ shdChan.CloseOnce()
+ time.Sleep(10 * time.Millisecond)
}
-*/
diff --git a/services/sipagent.go b/services/sipagent.go
index f4648cdfc..9c16ffbd7 100644
--- a/services/sipagent.go
+++ b/services/sipagent.go
@@ -91,6 +91,7 @@ func (sip *SIPAgent) Reload() (err error) {
sip.Lock()
sip.sip.Shutdown()
sip.oldListen = sip.cfg.SIPAgentCfg().Listen
+ sip.sip.InitStopChan()
sip.Unlock()
go func() {
if err := sip.sip.ListenAndServe(); err != nil {
diff --git a/services/sipagent_it_test.go b/services/sipagent_it_test.go
index 8bea3aa63..fd14dcb01 100644
--- a/services/sipagent_it_test.go
+++ b/services/sipagent_it_test.go
@@ -101,6 +101,7 @@ func TestSIPAgentReload(t *testing.T) {
}
castSrv.oldListen = "test_string"
runtime.Gosched()
+ runtime.Gosched()
err = srv.Reload()
if err != nil {
t.Errorf("\nExpecting ,\n Received <%+v>", err)
diff --git a/sessions/sessions.go b/sessions/sessions.go
index f47be75d5..3ece5d79e 100644
--- a/sessions/sessions.go
+++ b/sessions/sessions.go
@@ -112,8 +112,14 @@ func (sS *SessionS) ListenAndServe(stopChan chan struct{}) {
// Shutdown is called by engine to clear states
func (sS *SessionS) Shutdown() (err error) {
+ var hasErr bool
for _, s := range sS.getSessions("", false) { // Force sessions shutdown
- sS.terminateSession(s, nil, nil, nil, false)
+ if err = sS.terminateSession(s, nil, nil, nil, false); err != nil {
+ hasErr = true
+ }
+ }
+ if hasErr {
+ return utils.ErrPartiallyExecuted
}
return
}