From 23a7561d97bc67aebb56957eed09b900aa24879d Mon Sep 17 00:00:00 2001 From: andronache Date: Tue, 16 Feb 2021 13:57:40 +0200 Subject: [PATCH] Functions refactored in services --- engine/stats.go | 4 +- engine/stats_test.go | 29 ++++-------- services/diameteragent.go | 1 - services/diameteragent_it_test.go | 75 ++++++++++++++++++++++++++++--- services/stats.go | 9 +--- 5 files changed, 81 insertions(+), 37 deletions(-) diff --git a/engine/stats.go b/engine/stats.go index 17b06b1ba..de48da82a 100644 --- a/engine/stats.go +++ b/engine/stats.go @@ -31,7 +31,7 @@ import ( // NewStatService initializes a StatService func NewStatService(dm *DataManager, cgrcfg *config.CGRConfig, - filterS *FilterS, connMgr *ConnManager) (ss *StatService, err error) { + filterS *FilterS, connMgr *ConnManager) (ss *StatService) { return &StatService{ dm: dm, @@ -41,7 +41,7 @@ func NewStatService(dm *DataManager, cgrcfg *config.CGRConfig, storedStatQueues: make(utils.StringSet), loopStoped: make(chan struct{}), stopBackup: make(chan struct{}), - }, nil + } } // StatService builds stats for events diff --git a/engine/stats_test.go b/engine/stats_test.go index b860df695..63a6ccb30 100644 --- a/engine/stats_test.go +++ b/engine/stats_test.go @@ -37,10 +37,7 @@ func TestNewStatService(t *testing.T) { cgrcfg: testCgrCfg, storedStatQueues: make(utils.StringSet), } - result, err := NewStatService(testDM, testCgrCfg, testFltrS, nil) - if err != nil { - t.Errorf("\nExpecting ,\n Received <%+v>", err) - } + result := NewStatService(testDM, testCgrCfg, testFltrS, nil) if !reflect.DeepEqual(expStruct.dm, result.dm) { t.Errorf("\nExpecting <%+v>,\n Received <%+v>", expStruct.dm, result.dm) } @@ -173,11 +170,9 @@ func TestMatchingStatQueuesForEvent(t *testing.T) { defaultCfg.StatSCfg().StoreInterval = 1 defaultCfg.StatSCfg().StringIndexedFields = nil defaultCfg.StatSCfg().PrefixIndexedFields = nil - statService, err = NewStatService(dmSTS, defaultCfg, + statService = NewStatService(dmSTS, defaultCfg, &FilterS{dm: dmSTS, cfg: defaultCfg}, nil) - if err != nil { - t.Errorf("Error: %+v", err) - } + fltrSts1 := &Filter{ Tenant: config.CgrConfig().GeneralCfg().DefaultTenant, ID: "FLTR_STATS_1", @@ -415,11 +410,9 @@ func TestStatQueuesProcessEvent(t *testing.T) { defaultCfg.StatSCfg().StoreInterval = 1 defaultCfg.StatSCfg().StringIndexedFields = nil defaultCfg.StatSCfg().PrefixIndexedFields = nil - statService, err = NewStatService(dmSTS, defaultCfg, + statService = NewStatService(dmSTS, defaultCfg, &FilterS{dm: dmSTS, cfg: defaultCfg}, nil) - if err != nil { - t.Errorf("Error: %+v", err) - } + fltrSts1 := &Filter{ Tenant: config.CgrConfig().GeneralCfg().DefaultTenant, ID: "FLTR_STATS_1", @@ -658,11 +651,9 @@ func TestStatQueuesMatchWithIndexFalse(t *testing.T) { defaultCfg.StatSCfg().StoreInterval = 1 defaultCfg.StatSCfg().StringIndexedFields = nil defaultCfg.StatSCfg().PrefixIndexedFields = nil - statService, err = NewStatService(dmSTS, defaultCfg, + statService = NewStatService(dmSTS, defaultCfg, &FilterS{dm: dmSTS, cfg: defaultCfg}, nil) - if err != nil { - t.Errorf("Error: %+v", err) - } + fltrSts1 := &Filter{ Tenant: config.CgrConfig().GeneralCfg().DefaultTenant, ID: "FLTR_STATS_1", @@ -901,11 +892,9 @@ func TestStatQueuesV1ProcessEvent(t *testing.T) { defaultCfg.StatSCfg().StoreInterval = 1 defaultCfg.StatSCfg().StringIndexedFields = nil defaultCfg.StatSCfg().PrefixIndexedFields = nil - statService, err = NewStatService(dmSTS, defaultCfg, + statService = NewStatService(dmSTS, defaultCfg, &FilterS{dm: dmSTS, cfg: defaultCfg}, nil) - if err != nil { - t.Errorf("Error: %+v", err) - } + fltrSts1 := &Filter{ Tenant: config.CgrConfig().GeneralCfg().DefaultTenant, ID: "FLTR_STATS_1", diff --git a/services/diameteragent.go b/services/diameteragent.go index 9bc6e5197..689bcaded 100644 --- a/services/diameteragent.go +++ b/services/diameteragent.go @@ -111,7 +111,6 @@ func (da *DiameterAgent) Reload() (err error) { func (da *DiameterAgent) Shutdown() (err error) { da.Lock() close(da.stopChan) - da.da = nil da.Unlock() return // no shutdown for the momment diff --git a/services/diameteragent_it_test.go b/services/diameteragent_it_test.go index d45346d9a..3d184e882 100644 --- a/services/diameteragent_it_test.go +++ b/services/diameteragent_it_test.go @@ -35,7 +35,6 @@ import ( func TestDiameterAgentReload1(t *testing.T) { cfg := config.NewDefaultCGRConfig() - cfg.SessionSCfg().Enabled = true utils.Logger, _ = utils.Newlogger(utils.MetaSysLog, cfg.GeneralCfg().NodeID) utils.Logger.SetLogLevel(7) @@ -44,10 +43,75 @@ func TestDiameterAgentReload1(t *testing.T) { shdChan := utils.NewSyncedChan() shdWg := new(sync.WaitGroup) chS := engine.NewCacheS(cfg, nil, nil) - cacheSChan := make(chan rpcclient.ClientConnector, 1) cacheSChan <- chS + server := cores.NewServer(nil) + srvDep := map[string]*sync.WaitGroup{utils.DataDB: new(sync.WaitGroup)} + srvMngr := servmanager.NewServiceManager(cfg, shdChan, shdWg) + db := NewDataDBService(cfg, nil, srvDep) + anz := NewAnalyzerService(cfg, server, filterSChan, shdChan, make(chan rpcclient.ClientConnector, 1), srvDep) + sS := NewSessionService(cfg, db, server, make(chan rpcclient.ClientConnector, 1), + shdChan, nil, nil, anz, srvDep) + srv := NewDiameterAgent(cfg, filterSChan, shdChan, nil, srvDep) + engine.NewConnManager(cfg, nil) + srvMngr.AddServices(srv, sS, + NewLoaderService(cfg, db, filterSChan, server, make(chan rpcclient.ClientConnector, 1), nil, anz, srvDep), db) + if err := srvMngr.StartServices(); err != nil { + t.Fatal(err) + } + if srv.IsRunning() { + t.Errorf("Expected service to be down") + } + var reply string + if err := cfg.V1ReloadConfig(&config.ReloadArgs{ + Path: path.Join("/usr", "share", "cgrates", "conf", "samples", "diamagent_mysql"), + Section: config.DA_JSN, + }, &reply); err != nil { + t.Fatal(err) + } else if reply != utils.OK { + t.Errorf("Expecting OK ,received %s", reply) + } + time.Sleep(10 * time.Millisecond) //need to switch to gorutine + if !srv.IsRunning() { + t.Errorf("Expected service to be running") + } + err := srv.Start() + if err == nil || err != utils.ErrServiceAlreadyRunning { + t.Errorf("\nExpecting <%+v>,\n Received <%+v>", utils.ErrServiceAlreadyRunning, err) + } + err = srv.Reload() + if err != nil { + t.Errorf("\nExpecting ,\n Received <%+v>", err) + } + cfg.DiameterAgentCfg().Enabled = false + cfg.GetReloadChan(config.DA_JSN) <- struct{}{} + srv.(*DiameterAgent).lnet = "bad_lnet_test" + err2 := srv.Reload() + if err != nil { + t.Errorf("\nExpected <%+v>, \nReceived <%+v>", nil, err2) + } + time.Sleep(10 * time.Millisecond) + if srv.IsRunning() { + t.Errorf("Expected service to be down") + } + shdChan.CloseOnce() + time.Sleep(10 * time.Millisecond) +} + +/* +func TestDiameterAgentReload2(t *testing.T) { + cfg := config.NewDefaultCGRConfig() + cfg.SessionSCfg().Enabled = true + utils.Logger, _ = utils.Newlogger(utils.MetaSysLog, cfg.GeneralCfg().NodeID) + utils.Logger.SetLogLevel(7) + filterSChan := make(chan *engine.FilterS, 1) + filterSChan <- nil + shdChan := utils.NewSyncedChan() + shdWg := new(sync.WaitGroup) + chS := engine.NewCacheS(cfg, nil, nil) + cacheSChan := make(chan rpcclient.ClientConnector, 1) + cacheSChan <- chS server := cores.NewServer(nil) srvDep := map[string]*sync.WaitGroup{utils.DataDB: new(sync.WaitGroup)} srvMngr := servmanager.NewServiceManager(cfg, shdChan, shdWg) @@ -88,11 +152,7 @@ func TestDiameterAgentReload1(t *testing.T) { } cfg.DiameterAgentCfg().Enabled = false cfg.GetReloadChan(config.DA_JSN) <- struct{}{} - srv.(*DiameterAgent).lnet = "bad_lnet_test" - err2 := srv.Reload() - if err != nil { - t.Errorf("\nExpected <%+v>, \nReceived <%+v>", nil, err2) - } + srv.Shutdown() time.Sleep(10 * time.Millisecond) if srv.IsRunning() { t.Errorf("Expected service to be down") @@ -100,3 +160,4 @@ func TestDiameterAgentReload1(t *testing.T) { shdChan.CloseOnce() time.Sleep(10 * time.Millisecond) } +*/ diff --git a/services/stats.go b/services/stats.go index eb735d51a..ffd8200e6 100644 --- a/services/stats.go +++ b/services/stats.go @@ -86,13 +86,8 @@ func (sts *StatService) Start() (err error) { sts.Lock() defer sts.Unlock() - if sts.sts, err = engine.NewStatService(datadb, - sts.cfg, filterS, sts.connMgr); err != nil { - utils.Logger.Crit( - fmt.Sprintf(" Could not init, error: %s", - err.Error())) - return - } + sts.sts = engine.NewStatService(datadb, sts.cfg, filterS, sts.connMgr) + utils.Logger.Info(fmt.Sprintf("<%s> starting <%s> subsystem", utils.CoreS, utils.StatS)) sts.sts.StartLoop()