From e82c853fa34dfc9549c7c1d5070a2ebe4139c02b Mon Sep 17 00:00:00 2001 From: root Date: Tue, 16 Feb 2021 14:57:03 +0200 Subject: [PATCH] Refactoring in services --- engine/resources.go | 4 +-- engine/z_resources_test.go | 18 +++++----- services/resources.go | 8 ++--- services/thresholds_it_test.go | 65 ++++++++++++++++++++++++++++++++++ 4 files changed, 78 insertions(+), 17 deletions(-) diff --git a/engine/resources.go b/engine/resources.go index bf4b1bd59..b7a9cdfcc 100644 --- a/engine/resources.go +++ b/engine/resources.go @@ -314,7 +314,7 @@ func (rs Resources) allocateResource(ru *ResourceUsage, dryRun bool) (alcMessage // NewResourceService returns a new ResourceService func NewResourceService(dm *DataManager, cgrcfg *config.CGRConfig, - filterS *FilterS, connMgr *ConnManager) (*ResourceService, error) { + filterS *FilterS, connMgr *ConnManager) *ResourceService { return &ResourceService{dm: dm, storedResources: make(utils.StringSet), cgrcfg: cgrcfg, @@ -322,7 +322,7 @@ func NewResourceService(dm *DataManager, cgrcfg *config.CGRConfig, loopStoped: make(chan struct{}), stopBackup: make(chan struct{}), connMgr: connMgr, - }, nil + } } diff --git a/engine/z_resources_test.go b/engine/z_resources_test.go index 05a3dd836..4eba1f488 100644 --- a/engine/z_resources_test.go +++ b/engine/z_resources_test.go @@ -915,7 +915,7 @@ func TestResourceV1AuthorizeResourceMissingStruct(t *testing.T) { defaultCfg.ResourceSCfg().StoreInterval = 1 defaultCfg.ResourceSCfg().StringIndexedFields = nil defaultCfg.ResourceSCfg().PrefixIndexedFields = nil - resService, err := NewResourceService(dmRES, defaultCfg, + resService := NewResourceService(dmRES, defaultCfg, &FilterS{dm: dmRES, cfg: defaultCfg}, nil) if err != nil { t.Errorf("Error: %+v", err) @@ -1108,7 +1108,7 @@ func TestResourceMatchingResourcesForEvent(t *testing.T) { defaultCfg.ResourceSCfg().StoreInterval = 1 defaultCfg.ResourceSCfg().StringIndexedFields = nil defaultCfg.ResourceSCfg().PrefixIndexedFields = nil - resService, err := NewResourceService(dmRES, defaultCfg, + resService := NewResourceService(dmRES, defaultCfg, &FilterS{dm: dmRES, cfg: defaultCfg}, nil) if err != nil { t.Errorf("Error: %+v", err) @@ -1436,7 +1436,7 @@ func TestResourceUsageTTLCase1(t *testing.T) { defaultCfg.ResourceSCfg().StoreInterval = 1 defaultCfg.ResourceSCfg().StringIndexedFields = nil defaultCfg.ResourceSCfg().PrefixIndexedFields = nil - resService, err := NewResourceService(dmRES, defaultCfg, + resService := NewResourceService(dmRES, defaultCfg, &FilterS{dm: dmRES, cfg: defaultCfg}, nil) if err != nil { t.Errorf("Error: %+v", err) @@ -1641,7 +1641,7 @@ func TestResourceUsageTTLCase2(t *testing.T) { defaultCfg.ResourceSCfg().StoreInterval = 1 defaultCfg.ResourceSCfg().StringIndexedFields = nil defaultCfg.ResourceSCfg().PrefixIndexedFields = nil - resService, err := NewResourceService(dmRES, defaultCfg, + resService := NewResourceService(dmRES, defaultCfg, &FilterS{dm: dmRES, cfg: defaultCfg}, nil) if err != nil { t.Errorf("Error: %+v", err) @@ -1846,7 +1846,7 @@ func TestResourceUsageTTLCase3(t *testing.T) { defaultCfg.ResourceSCfg().StoreInterval = 1 defaultCfg.ResourceSCfg().StringIndexedFields = nil defaultCfg.ResourceSCfg().PrefixIndexedFields = nil - resService, err := NewResourceService(dmRES, defaultCfg, + resService := NewResourceService(dmRES, defaultCfg, &FilterS{dm: dmRES, cfg: defaultCfg}, nil) if err != nil { t.Errorf("Error: %+v", err) @@ -2051,7 +2051,7 @@ func TestResourceUsageTTLCase4(t *testing.T) { defaultCfg.ResourceSCfg().StoreInterval = 1 defaultCfg.ResourceSCfg().StringIndexedFields = nil defaultCfg.ResourceSCfg().PrefixIndexedFields = nil - resService, err := NewResourceService(dmRES, defaultCfg, + resService := NewResourceService(dmRES, defaultCfg, &FilterS{dm: dmRES, cfg: defaultCfg}, nil) if err != nil { t.Errorf("Error: %+v", err) @@ -2380,7 +2380,7 @@ func TestResourceMatchWithIndexFalse(t *testing.T) { defaultCfg.ResourceSCfg().StoreInterval = 1 defaultCfg.ResourceSCfg().StringIndexedFields = nil defaultCfg.ResourceSCfg().PrefixIndexedFields = nil - resService, err := NewResourceService(dmRES, defaultCfg, + resService := NewResourceService(dmRES, defaultCfg, &FilterS{dm: dmRES, cfg: defaultCfg}, nil) if err != nil { t.Errorf("Error: %+v", err) @@ -2607,7 +2607,7 @@ func TestResourceCaching(t *testing.T) { defaultCfg.ResourceSCfg().StoreInterval = 1 defaultCfg.ResourceSCfg().StringIndexedFields = nil defaultCfg.ResourceSCfg().PrefixIndexedFields = nil - resService, err := NewResourceService(dmRES, defaultCfg, + resService := NewResourceService(dmRES, defaultCfg, &FilterS{dm: dmRES, cfg: defaultCfg}, nil) if err != nil { t.Errorf("Error: %+v", err) @@ -2758,7 +2758,7 @@ func TestResourceCaching(t *testing.T) { defaultCfg.ResourceSCfg().StoreInterval = 1 defaultCfg.ResourceSCfg().StringIndexedFields = nil defaultCfg.ResourceSCfg().PrefixIndexedFields = nil - resService, err = NewResourceService(dmRES, defaultCfg, + resService = NewResourceService(dmRES, defaultCfg, &FilterS{dm: dmRES, cfg: defaultCfg}, nil) if err != nil { t.Errorf("Error: %+v", err) diff --git a/services/resources.go b/services/resources.go index f01122a2d..6116954e3 100644 --- a/services/resources.go +++ b/services/resources.go @@ -67,7 +67,7 @@ type ResourceService struct { srvDep map[string]*sync.WaitGroup } -// Start should handle the sercive start +// Start should handle the service start func (reS *ResourceService) Start() (err error) { if reS.IsRunning() { return utils.ErrServiceAlreadyRunning @@ -85,11 +85,7 @@ func (reS *ResourceService) Start() (err error) { reS.Lock() defer reS.Unlock() - reS.reS, err = engine.NewResourceService(datadb, reS.cfg, filterS, reS.connMgr) - if err != nil { - utils.Logger.Crit(fmt.Sprintf("<%s> Could not init, error: %s", utils.ResourceS, err.Error())) - return - } + reS.reS = engine.NewResourceService(datadb, reS.cfg, filterS, reS.connMgr) utils.Logger.Info(fmt.Sprintf("<%s> starting <%s> subsystem", utils.CoreS, utils.ResourceS)) reS.reS.StartLoop() reS.rpc = v1.NewResourceSv1(reS.reS) diff --git a/services/thresholds_it_test.go b/services/thresholds_it_test.go index ae204ffbc..192080ee2 100644 --- a/services/thresholds_it_test.go +++ b/services/thresholds_it_test.go @@ -98,3 +98,68 @@ func TestThresholdSReload(t *testing.T) { shdChan.CloseOnce() time.Sleep(10 * time.Millisecond) } +func TestThresholdSReload2(t *testing.T) { + // utils.Logger.SetLogLevel(7) + cfg := config.NewDefaultCGRConfig() + + utils.Logger, _ = utils.Newlogger(utils.MetaSysLog, cfg.GeneralCfg().NodeID) + utils.Logger.SetLogLevel(7) + filterSChan := make(chan *engine.FilterS, 1) + filterSChan <- nil + shdChan := utils.NewSyncedChan() + shdWg := new(sync.WaitGroup) + chS := engine.NewCacheS(cfg, nil, nil) + close(chS.GetPrecacheChannel(utils.CacheThresholdProfiles)) + close(chS.GetPrecacheChannel(utils.CacheThresholds)) + close(chS.GetPrecacheChannel(utils.CacheThresholdFilterIndexes)) + server := cores.NewServer(nil) + srvMngr := servmanager.NewServiceManager(cfg, shdChan, shdWg) + srvDep := map[string]*sync.WaitGroup{utils.DataDB: new(sync.WaitGroup)} + anz := NewAnalyzerService(cfg, server, filterSChan, shdChan, make(chan rpcclient.ClientConnector, 1), srvDep) + db := NewDataDBService(cfg, nil, srvDep) + tS := NewThresholdService(cfg, db, chS, filterSChan, server, make(chan rpcclient.ClientConnector, 1), anz, srvDep) + engine.NewConnManager(cfg, nil) + srvMngr.AddServices(tS, + NewLoaderService(cfg, db, filterSChan, server, make(chan rpcclient.ClientConnector, 1), nil, anz, srvDep), db) + if err := srvMngr.StartServices(); err != nil { + t.Error(err) + } + if tS.IsRunning() { + t.Errorf("Expected service to be down") + } + if db.IsRunning() { + t.Errorf("Expected service to be down") + } + var reply string + if err := cfg.V1ReloadConfig(&config.ReloadArgs{ + Path: path.Join("/usr", "share", "cgrates", "conf", "samples", "tutmongo"), + Section: config.THRESHOLDS_JSON, + }, &reply); err != nil { + t.Error(err) + } else if reply != utils.OK { + t.Errorf("Expecting OK ,received %s", reply) + } + time.Sleep(10 * time.Millisecond) //need to switch to gorutine + if !tS.IsRunning() { + t.Errorf("Expected service to be running") + } + if !db.IsRunning() { + t.Errorf("Expected service to be running") + } + err := tS.Start() + if err == nil || err != utils.ErrServiceAlreadyRunning { + t.Errorf("\nExpecting <%+v>,\n Received <%+v>", utils.ErrServiceAlreadyRunning, err) + } + err = tS.Reload() + if err != nil { + t.Errorf("\nExpecting ,\n Received <%+v>", err) + } + cfg.ThresholdSCfg().Enabled = false + cfg.GetReloadChan(config.THRESHOLDS_JSON) <- struct{}{} + time.Sleep(10 * time.Millisecond) + if tS.IsRunning() { + t.Errorf("Expected service to be down") + } + shdChan.CloseOnce() + time.Sleep(10 * time.Millisecond) +}