mirror of
https://github.com/cgrates/cgrates.git
synced 2026-02-11 10:06:24 +05:00
remove loaders subsystem
This commit is contained in:
committed by
Dan Christian Bogos
parent
16370dbe53
commit
3153bc8378
@@ -56,8 +56,7 @@ func TestAnalyzerSReload(t *testing.T) {
|
||||
anzRPC := make(chan birpc.ClientConnector, 1)
|
||||
anz := NewAnalyzerService(cfg, server, filterSChan, shdChan, anzRPC, srvDep)
|
||||
engine.NewConnManager(cfg, nil)
|
||||
srvMngr.AddServices(anz,
|
||||
NewLoaderService(cfg, db, filterSChan, server, make(chan birpc.ClientConnector, 1), nil, anz, srvDep), db)
|
||||
srvMngr.AddServices(anz, db)
|
||||
if err := srvMngr.StartServices(); err != nil {
|
||||
t.Error(err)
|
||||
}
|
||||
|
||||
@@ -69,8 +69,7 @@ func TestApiersReload(t *testing.T) {
|
||||
make(chan birpc.ClientConnector, 1), nil, anz, srvDep)
|
||||
|
||||
apiSv2 := NewAPIerSv2Service(apiSv1, cfg, server, make(chan birpc.ClientConnector, 1), anz, srvDep)
|
||||
srvMngr.AddServices(apiSv1, apiSv2, schS, tS,
|
||||
NewLoaderService(cfg, db, filterSChan, server, make(chan birpc.ClientConnector, 1), nil, anz, srvDep), db, stordb)
|
||||
srvMngr.AddServices(apiSv1, apiSv2, schS, tS, db, stordb)
|
||||
if err := srvMngr.StartServices(); err != nil {
|
||||
t.Error(err)
|
||||
}
|
||||
|
||||
@@ -71,8 +71,7 @@ func TestAsteriskAgentReload(t *testing.T) {
|
||||
sS := NewSessionService(cfg, db, server, make(chan birpc.ClientConnector, 1),
|
||||
shdChan, cm, anz, srvDep)
|
||||
astService := NewAsteriskAgent(cfg, shdChan, cm, srvDep)
|
||||
srvMngr.AddServices(astService, sS,
|
||||
NewLoaderService(cfg, db, filterSChan, server, make(chan birpc.ClientConnector, 1), nil, anz, srvDep), db)
|
||||
srvMngr.AddServices(astService, sS, db)
|
||||
if err := srvMngr.StartServices(); err != nil {
|
||||
t.Fatal(err)
|
||||
}
|
||||
@@ -144,8 +143,7 @@ func TestAsteriskAgentReload2(t *testing.T) {
|
||||
sS := NewSessionService(cfg, db, server, make(chan birpc.ClientConnector, 1),
|
||||
shdChan, cm, anz, srvDep)
|
||||
astSrv := NewAsteriskAgent(cfg, shdChan, cm, srvDep)
|
||||
srvMngr.AddServices(astSrv, sS,
|
||||
NewLoaderService(cfg, db, filterSChan, server, make(chan birpc.ClientConnector, 1), nil, anz, srvDep), db)
|
||||
srvMngr.AddServices(astSrv, sS, db)
|
||||
if err := srvMngr.StartServices(); err != nil {
|
||||
t.Fatal(err)
|
||||
}
|
||||
|
||||
@@ -58,8 +58,7 @@ func TestAttributeSReload(t *testing.T) {
|
||||
chS, filterSChan, server, attrRPC,
|
||||
anz, srvDep)
|
||||
engine.NewConnManager(cfg, nil)
|
||||
srvMngr.AddServices(attrS,
|
||||
NewLoaderService(cfg, db, filterSChan, server, make(chan birpc.ClientConnector, 1), nil, anz, srvDep), db)
|
||||
srvMngr.AddServices(attrS, db)
|
||||
if err := srvMngr.StartServices(); err != nil {
|
||||
t.Error(err)
|
||||
}
|
||||
|
||||
@@ -77,9 +77,7 @@ func TestCdrsReload(t *testing.T) {
|
||||
cdrsRPC := make(chan birpc.ClientConnector, 1)
|
||||
cdrS := NewCDRServer(cfg, db, stordb, filterSChan, server,
|
||||
cdrsRPC, nil, anz, srvDep)
|
||||
srvMngr.AddServices(cdrS, ralS, schS, chrS,
|
||||
NewLoaderService(cfg, db, filterSChan, server,
|
||||
make(chan birpc.ClientConnector, 1), nil, anz, srvDep), db, stordb)
|
||||
srvMngr.AddServices(cdrS, ralS, schS, chrS, db, stordb)
|
||||
if err := srvMngr.StartServices(); err != nil {
|
||||
t.Error(err)
|
||||
}
|
||||
|
||||
@@ -58,9 +58,7 @@ func TestChargerSReload(t *testing.T) {
|
||||
attrS := NewAttributeService(cfg, db, chS, filterSChan, server, make(chan birpc.ClientConnector, 1), anz, srvDep)
|
||||
chrS := NewChargerService(cfg, db, chS, filterSChan, server, make(chan birpc.ClientConnector, 1), nil, anz, srvDep)
|
||||
engine.NewConnManager(cfg, nil)
|
||||
srvMngr.AddServices(attrS, chrS,
|
||||
NewLoaderService(cfg, db, filterSChan, server,
|
||||
make(chan birpc.ClientConnector, 1), nil, anz, srvDep), db)
|
||||
srvMngr.AddServices(attrS, chrS, db)
|
||||
if err := srvMngr.StartServices(); err != nil {
|
||||
t.Error(err)
|
||||
}
|
||||
|
||||
@@ -54,8 +54,7 @@ func TestCoreSReload(t *testing.T) {
|
||||
caps := engine.NewCaps(1, "test_caps")
|
||||
coreS := NewCoreService(cfg, caps, server, coreRPC, anz, nil, nil, nil, srvDep)
|
||||
engine.NewConnManager(cfg, nil)
|
||||
srvMngr.AddServices(coreS,
|
||||
NewLoaderService(cfg, db, filterSChan, server, make(chan birpc.ClientConnector, 1), nil, anz, srvDep), db)
|
||||
srvMngr.AddServices(coreS, db)
|
||||
if err := srvMngr.StartServices(); err != nil {
|
||||
t.Fatal(err)
|
||||
}
|
||||
|
||||
@@ -145,7 +145,7 @@ func (db *DataDBService) mandatoryDB() bool {
|
||||
return db.cfg.RalsCfg().Enabled || db.cfg.SchedulerCfg().Enabled || db.cfg.ChargerSCfg().Enabled ||
|
||||
db.cfg.AttributeSCfg().Enabled || db.cfg.ResourceSCfg().Enabled || db.cfg.StatSCfg().Enabled ||
|
||||
db.cfg.ThresholdSCfg().Enabled || db.cfg.RouteSCfg().Enabled || db.cfg.DispatcherSCfg().Enabled ||
|
||||
db.cfg.LoaderCfg().Enabled() || db.cfg.ApierCfg().Enabled || db.cfg.AnalyzerSCfg().Enabled || db.cfg.ERsCfg().Enabled
|
||||
db.cfg.ApierCfg().Enabled || db.cfg.AnalyzerSCfg().Enabled || db.cfg.ERsCfg().Enabled
|
||||
}
|
||||
|
||||
// GetDM returns the DataManager
|
||||
|
||||
@@ -56,8 +56,7 @@ func TestDataDBReload(t *testing.T) {
|
||||
db := NewDataDBService(cfg, cM, false, srvDep)
|
||||
anz := NewAnalyzerService(cfg, server, filterSChan, shdChan, make(chan birpc.ClientConnector, 1), srvDep)
|
||||
srvMngr.AddServices(NewAttributeService(cfg, db,
|
||||
chS, filterSChan, server, make(chan birpc.ClientConnector, 1), anz, srvDep),
|
||||
NewLoaderService(cfg, db, filterSChan, server, make(chan birpc.ClientConnector, 1), nil, anz, srvDep), db)
|
||||
chS, filterSChan, server, make(chan birpc.ClientConnector, 1), anz, srvDep), db)
|
||||
if err := srvMngr.StartServices(); err != nil {
|
||||
t.Error(err)
|
||||
}
|
||||
|
||||
@@ -61,8 +61,7 @@ func TestDiameterAgentReload1(t *testing.T) {
|
||||
shdChan, nil, anz, srvDep)
|
||||
diamSrv := NewDiameterAgent(cfg, filterSChan, shdChan, nil, nil, srvDep)
|
||||
engine.NewConnManager(cfg, nil)
|
||||
srvMngr.AddServices(diamSrv, sS,
|
||||
NewLoaderService(cfg, db, filterSChan, server, make(chan birpc.ClientConnector, 1), nil, anz, srvDep), db)
|
||||
srvMngr.AddServices(diamSrv, sS, db)
|
||||
if err := srvMngr.StartServices(); err != nil {
|
||||
t.Fatal(err)
|
||||
}
|
||||
|
||||
@@ -60,9 +60,7 @@ func TestDispatcherSReload(t *testing.T) {
|
||||
srv := NewDispatcherService(cfg, db, chS, filterSChan, server,
|
||||
make(chan birpc.ClientConnector, 1), nil, anz, srvDep)
|
||||
engine.NewConnManager(cfg, nil)
|
||||
srvMngr.AddServices(attrS, srv,
|
||||
NewLoaderService(cfg, db, filterSChan, server,
|
||||
make(chan birpc.ClientConnector, 1), nil, anz, srvDep), db)
|
||||
srvMngr.AddServices(attrS, srv, db)
|
||||
if err := srvMngr.StartServices(); err != nil {
|
||||
t.Error(err)
|
||||
}
|
||||
|
||||
@@ -67,8 +67,7 @@ func TestDNSAgentStartReloadShut(t *testing.T) {
|
||||
anz := NewAnalyzerService(cfg, server, filterSChan, shdChan, make(chan birpc.ClientConnector, 1), srvDep)
|
||||
sS := NewSessionService(cfg, db, server, make(chan birpc.ClientConnector, 1),
|
||||
shdChan, nil, anz, srvDep)
|
||||
srvMngr.AddServices(srv, sS,
|
||||
NewLoaderService(cfg, db, filterSChan, server, make(chan birpc.ClientConnector, 1), nil, anz, srvDep), db)
|
||||
srvMngr.AddServices(srv, sS, db)
|
||||
runtime.Gosched()
|
||||
time.Sleep(10 * time.Millisecond) //need to switch to gorutine
|
||||
if err := srv.Shutdown(); err != nil {
|
||||
@@ -124,8 +123,7 @@ func TestDNSAgentReloadFirst(t *testing.T) {
|
||||
shdChan, nil, anz, srvDep)
|
||||
srv := NewDNSAgent(cfg, filterSChan, shdChan, nil, nil, srvDep)
|
||||
engine.NewConnManager(cfg, nil)
|
||||
srvMngr.AddServices(srv, sS,
|
||||
NewLoaderService(cfg, db, filterSChan, server, make(chan birpc.ClientConnector, 1), nil, anz, srvDep), db)
|
||||
srvMngr.AddServices(srv, sS, db)
|
||||
if err := srvMngr.StartServices(); err != nil {
|
||||
t.Fatal(err)
|
||||
}
|
||||
|
||||
@@ -68,8 +68,7 @@ func TestEventExporterSReload(t *testing.T) {
|
||||
anz, srvDep)
|
||||
ees := NewEventExporterService(cfg, filterSChan, engine.NewConnManager(cfg, nil),
|
||||
server, make(chan birpc.ClientConnector, 2), anz, srvDep)
|
||||
srvMngr.AddServices(ees, attrS,
|
||||
NewLoaderService(cfg, db, filterSChan, server, make(chan birpc.ClientConnector, 1), nil, anz, srvDep), db)
|
||||
srvMngr.AddServices(ees, attrS, db)
|
||||
if err := srvMngr.StartServices(); err != nil {
|
||||
t.Fatal(err)
|
||||
}
|
||||
|
||||
@@ -70,8 +70,7 @@ func TestEventReaderSReload(t *testing.T) {
|
||||
intERsConn := make(chan birpc.ClientConnector, 1)
|
||||
erS := NewEventReaderService(cfg, db, filterSChan, shdChan, nil, server, intERsConn, anz, srvDep)
|
||||
engine.NewConnManager(cfg, nil)
|
||||
srvMngr.AddServices(erS, sS,
|
||||
NewLoaderService(cfg, db, filterSChan, server, make(chan birpc.ClientConnector, 1), nil, anz, srvDep), db)
|
||||
srvMngr.AddServices(erS, sS, db)
|
||||
if err := srvMngr.StartServices(); err != nil {
|
||||
t.Fatal(err)
|
||||
}
|
||||
|
||||
@@ -72,8 +72,7 @@ func TestFreeSwitchAgentReload(t *testing.T) {
|
||||
sS := NewSessionService(cfg, db, server, make(chan birpc.ClientConnector, 1),
|
||||
shdChan, cm, anz, srvDep)
|
||||
srv := NewFreeswitchAgent(cfg, shdChan, cm, srvDep)
|
||||
srvMngr.AddServices(srv, sS,
|
||||
NewLoaderService(cfg, db, filterSChan, server, make(chan birpc.ClientConnector, 1), cm, anz, srvDep), db)
|
||||
srvMngr.AddServices(srv, sS, db)
|
||||
if err := srvMngr.StartServices(); err != nil {
|
||||
t.Fatal(err)
|
||||
}
|
||||
|
||||
@@ -66,8 +66,7 @@ func TestHTTPAgentReload(t *testing.T) {
|
||||
shdChan, nil, anz, srvDep)
|
||||
srv := NewHTTPAgent(cfg, filterSChan, server, nil, srvDep)
|
||||
engine.NewConnManager(cfg, nil)
|
||||
srvMngr.AddServices(srv, sS,
|
||||
NewLoaderService(cfg, db, filterSChan, server, make(chan birpc.ClientConnector, 1), nil, anz, srvDep), db)
|
||||
srvMngr.AddServices(srv, sS, db)
|
||||
if err := srvMngr.StartServices(); err != nil {
|
||||
t.Fatal(err)
|
||||
}
|
||||
|
||||
@@ -76,8 +76,7 @@ func TestKamailioAgentReload(t *testing.T) {
|
||||
sS := NewSessionService(cfg, db, server, make(chan birpc.ClientConnector, 1),
|
||||
shdChan, cm, anz, srvDep)
|
||||
srv := NewKamailioAgent(cfg, shdChan, cm, srvDep)
|
||||
srvMngr.AddServices(srv, sS,
|
||||
NewLoaderService(cfg, db, filterSChan, server, make(chan birpc.ClientConnector, 1), cm, anz, srvDep), db)
|
||||
srvMngr.AddServices(srv, sS, db)
|
||||
if err := srvMngr.StartServices(); err != nil {
|
||||
t.Fatal(err)
|
||||
}
|
||||
|
||||
@@ -1,154 +0,0 @@
|
||||
/*
|
||||
Real-time Online/Offline Charging System (OCS) for Telecom & ISP environments
|
||||
Copyright (C) ITsysCOM GmbH
|
||||
|
||||
This program is free software: you can redistribute it and/or modify
|
||||
it under the terms of the GNU Affero General Public License as published by
|
||||
the Free Software Foundation, either version 3 of the License, or
|
||||
(at your option) any later version.
|
||||
|
||||
This program is distributed in the hope that it will be useful,
|
||||
but WITHOUT ANY WARRANTY; without even the implied warranty of
|
||||
MERCHANTABILITY or FITNESS FOR A PARTICULAR PURPOSE. See the
|
||||
GNU Affero General Public License for more details.
|
||||
|
||||
You should have received a copy of the GNU Affero General Public License
|
||||
along with this program. If not, see <https://www.gnu.org/licenses/>
|
||||
*/
|
||||
|
||||
package services
|
||||
|
||||
import (
|
||||
"sync"
|
||||
|
||||
"github.com/cgrates/birpc"
|
||||
v1 "github.com/cgrates/cgrates/apier/v1"
|
||||
"github.com/cgrates/cgrates/config"
|
||||
"github.com/cgrates/cgrates/cores"
|
||||
"github.com/cgrates/cgrates/engine"
|
||||
"github.com/cgrates/cgrates/loaders"
|
||||
"github.com/cgrates/cgrates/utils"
|
||||
)
|
||||
|
||||
// NewLoaderService returns the Loader Service
|
||||
func NewLoaderService(cfg *config.CGRConfig, dm *DataDBService,
|
||||
filterSChan chan *engine.FilterS, server *cores.Server,
|
||||
internalLoaderSChan chan birpc.ClientConnector,
|
||||
connMgr *engine.ConnManager, anz *AnalyzerService,
|
||||
srvDep map[string]*sync.WaitGroup) *LoaderService {
|
||||
return &LoaderService{
|
||||
connChan: internalLoaderSChan,
|
||||
cfg: cfg,
|
||||
dm: dm,
|
||||
filterSChan: filterSChan,
|
||||
server: server,
|
||||
connMgr: connMgr,
|
||||
stopChan: make(chan struct{}),
|
||||
anz: anz,
|
||||
srvDep: srvDep,
|
||||
}
|
||||
}
|
||||
|
||||
// LoaderService implements Service interface
|
||||
type LoaderService struct {
|
||||
sync.RWMutex
|
||||
cfg *config.CGRConfig
|
||||
dm *DataDBService
|
||||
filterSChan chan *engine.FilterS
|
||||
server *cores.Server
|
||||
stopChan chan struct{}
|
||||
|
||||
ldrs *loaders.LoaderService
|
||||
connChan chan birpc.ClientConnector
|
||||
connMgr *engine.ConnManager
|
||||
anz *AnalyzerService
|
||||
srvDep map[string]*sync.WaitGroup
|
||||
}
|
||||
|
||||
// Start should handle the sercive start
|
||||
func (ldrs *LoaderService) Start() error {
|
||||
if ldrs.IsRunning() {
|
||||
return utils.ErrServiceAlreadyRunning
|
||||
}
|
||||
|
||||
filterS := <-ldrs.filterSChan
|
||||
ldrs.filterSChan <- filterS
|
||||
dbchan := ldrs.dm.GetDMChan()
|
||||
datadb := <-dbchan
|
||||
dbchan <- datadb
|
||||
|
||||
ldrs.Lock()
|
||||
defer ldrs.Unlock()
|
||||
|
||||
ldrs.ldrs = loaders.NewLoaderService(datadb, ldrs.cfg.LoaderCfg(),
|
||||
ldrs.cfg.GeneralCfg().DefaultTimezone, ldrs.cfg.GeneralCfg().CachingDelay, filterS, ldrs.connMgr)
|
||||
|
||||
if !ldrs.ldrs.Enabled() {
|
||||
return nil
|
||||
}
|
||||
if err := ldrs.ldrs.ListenAndServe(ldrs.stopChan); err != nil {
|
||||
return err
|
||||
}
|
||||
srv, err := engine.NewService(v1.NewLoaderSv1(ldrs.ldrs))
|
||||
if err != nil {
|
||||
return err
|
||||
}
|
||||
if !ldrs.cfg.DispatcherSCfg().Enabled {
|
||||
ldrs.server.RpcRegister(srv)
|
||||
}
|
||||
ldrs.connChan <- ldrs.anz.GetInternalCodec(srv, utils.LoaderS)
|
||||
return nil
|
||||
}
|
||||
|
||||
// Reload handles the change of config
|
||||
func (ldrs *LoaderService) Reload() (err error) {
|
||||
filterS := <-ldrs.filterSChan
|
||||
ldrs.filterSChan <- filterS
|
||||
dbchan := ldrs.dm.GetDMChan()
|
||||
datadb := <-dbchan
|
||||
dbchan <- datadb
|
||||
close(ldrs.stopChan)
|
||||
ldrs.stopChan = make(chan struct{})
|
||||
|
||||
ldrs.RLock()
|
||||
|
||||
ldrs.ldrs.Reload(datadb, ldrs.cfg.LoaderCfg(), ldrs.cfg.GeneralCfg().DefaultTimezone, ldrs.cfg.GeneralCfg().CachingDelay,
|
||||
filterS, ldrs.connMgr)
|
||||
if err = ldrs.ldrs.ListenAndServe(ldrs.stopChan); err != nil {
|
||||
return
|
||||
}
|
||||
ldrs.RUnlock()
|
||||
return
|
||||
}
|
||||
|
||||
// Shutdown stops the service
|
||||
func (ldrs *LoaderService) Shutdown() (err error) {
|
||||
ldrs.Lock()
|
||||
ldrs.ldrs = nil
|
||||
close(ldrs.stopChan)
|
||||
<-ldrs.connChan
|
||||
ldrs.Unlock()
|
||||
return
|
||||
}
|
||||
|
||||
// IsRunning returns if the service is running
|
||||
func (ldrs *LoaderService) IsRunning() bool {
|
||||
ldrs.RLock()
|
||||
defer ldrs.RUnlock()
|
||||
return ldrs.ldrs != nil && ldrs.ldrs.Enabled()
|
||||
}
|
||||
|
||||
// ServiceName returns the service name
|
||||
func (ldrs *LoaderService) ServiceName() string {
|
||||
return utils.LoaderS
|
||||
}
|
||||
|
||||
// ShouldRun returns if the service should be running
|
||||
func (ldrs *LoaderService) ShouldRun() bool {
|
||||
return ldrs.cfg.LoaderCfg().Enabled()
|
||||
}
|
||||
|
||||
// GetLoaderS returns the initialized LoaderService
|
||||
func (ldrs *LoaderService) GetLoaderS() *loaders.LoaderService {
|
||||
return ldrs.ldrs
|
||||
}
|
||||
@@ -1,200 +0,0 @@
|
||||
//go:build integration
|
||||
// +build integration
|
||||
|
||||
/*
|
||||
Real-time Online/Offline Charging System (OCS) for Telecom & ISP environments
|
||||
Copyright (C) ITsysCOM GmbH
|
||||
|
||||
This program is free software: you can redistribute it and/or modify
|
||||
it under the terms of the GNU Affero General Public License as published by
|
||||
the Free Software Foundation, either version 3 of the License, or
|
||||
(at your option) any later version.
|
||||
|
||||
This program is distributed in the hope that it will be useful,
|
||||
but WITHOUT ANY WARRANTY; without even the implied warranty of
|
||||
MERCHANTABILITY or FITNESS FOR A PARTICULAR PURPOSE. See the
|
||||
GNU Affero General Public License for more details.
|
||||
|
||||
You should have received a copy of the GNU Affero General Public License
|
||||
along with this program. If not, see <https://www.gnu.org/licenses/>
|
||||
*/
|
||||
package services
|
||||
|
||||
import (
|
||||
"os"
|
||||
"path"
|
||||
"sync"
|
||||
"testing"
|
||||
"time"
|
||||
|
||||
"github.com/cgrates/birpc"
|
||||
"github.com/cgrates/birpc/context"
|
||||
"github.com/cgrates/cgrates/config"
|
||||
"github.com/cgrates/cgrates/cores"
|
||||
"github.com/cgrates/cgrates/engine"
|
||||
"github.com/cgrates/cgrates/servmanager"
|
||||
"github.com/cgrates/cgrates/utils"
|
||||
)
|
||||
|
||||
func testCreateDirs(t *testing.T) {
|
||||
for _, dir := range []string{"/tmp/In", "/tmp/Out", "/tmp/LoaderIn", "/tmp/SubpathWithoutMove",
|
||||
"/tmp/SubpathLoaderWithMove", "/tmp/SubpathOut", "/tmp/templateLoaderIn", "/tmp/templateLoaderOut",
|
||||
"/tmp/customSepLoaderIn", "/tmp/customSepLoaderOut"} {
|
||||
if err := os.RemoveAll(dir); err != nil {
|
||||
t.Fatal("Error removing folder: ", dir, err)
|
||||
}
|
||||
if err := os.MkdirAll(dir, 0755); err != nil {
|
||||
t.Fatal("Error creating folder: ", dir, err)
|
||||
}
|
||||
}
|
||||
if err := os.WriteFile(path.Join("/tmp/In", utils.AttributesCsv), []byte(`
|
||||
#Tenant,ID,Contexts,FilterIDs,ActivationInterval,AttributeFilterIDs,Path,Type,Value,Blocker,Weight
|
||||
cgrates.org,ALS1,con1,*string:~*req.Account:1001,2014-07-29T15:00:00Z,*string:~*req.Field1:Initial,*req.Field1,*variable,Sub1,true,20
|
||||
cgrates.org,ALS1,con2;con3,,,,*req.Field2,*variable,Sub2,true,20
|
||||
`), 0644); err != nil {
|
||||
t.Fatal(err.Error())
|
||||
}
|
||||
}
|
||||
|
||||
func TestLoaderSReload(t *testing.T) {
|
||||
testCreateDirs(t)
|
||||
cfg := config.NewDefaultCGRConfig()
|
||||
cfg.TemplatesCfg()["attrTemplateLoader"] = []*config.FCTemplate{
|
||||
{
|
||||
Type: utils.MetaVariable,
|
||||
Path: "*req.Accounts",
|
||||
Value: config.NewRSRParsersMustCompile("1001", utils.InfieldSep),
|
||||
},
|
||||
}
|
||||
utils.Logger, _ = utils.Newlogger(utils.MetaSysLog, cfg.GeneralCfg().NodeID)
|
||||
utils.Logger.SetLogLevel(7)
|
||||
|
||||
shdChan := utils.NewSyncedChan()
|
||||
shdWg := new(sync.WaitGroup)
|
||||
filterSChan := make(chan *engine.FilterS, 1)
|
||||
filterSChan <- nil
|
||||
server := cores.NewServer(nil)
|
||||
srvMngr := servmanager.NewServiceManager(cfg, shdChan, shdWg, nil)
|
||||
srvDep := map[string]*sync.WaitGroup{utils.DataDB: new(sync.WaitGroup)}
|
||||
db := NewDataDBService(cfg, nil, false, srvDep)
|
||||
anz := NewAnalyzerService(cfg, server, filterSChan, shdChan, make(chan birpc.ClientConnector, 1), srvDep)
|
||||
conMngr := engine.NewConnManager(cfg, nil)
|
||||
srv := NewLoaderService(cfg, db, filterSChan,
|
||||
server, make(chan birpc.ClientConnector, 1),
|
||||
conMngr, anz, srvDep)
|
||||
srvMngr.AddServices(srv, db)
|
||||
if err := srvMngr.StartServices(); err != nil {
|
||||
t.Fatal(err)
|
||||
}
|
||||
|
||||
if db.IsRunning() {
|
||||
t.Errorf("Expected service to be down")
|
||||
}
|
||||
|
||||
if srv.IsRunning() {
|
||||
t.Errorf("Expected service to be down")
|
||||
}
|
||||
|
||||
var reply string
|
||||
if err := cfg.V1ReloadConfig(context.Background(),
|
||||
&config.ReloadArgs{
|
||||
Path: path.Join("/usr", "share", "cgrates", "conf", "samples", "loaders", "tutinternal"),
|
||||
Section: config.LoaderJson,
|
||||
}, &reply); err != nil {
|
||||
t.Fatal(err)
|
||||
} else if reply != utils.OK {
|
||||
t.Errorf("Expecting OK ,received %s", reply)
|
||||
}
|
||||
time.Sleep(10 * time.Millisecond)
|
||||
if !db.IsRunning() {
|
||||
t.Fatal("Expected service to be running")
|
||||
}
|
||||
time.Sleep(10 * time.Millisecond)
|
||||
if !srv.IsRunning() {
|
||||
t.Fatal("Expected service to be running")
|
||||
}
|
||||
|
||||
err := srv.Start()
|
||||
if err == nil || err != utils.ErrServiceAlreadyRunning {
|
||||
t.Errorf("\nExpecting <%+v>,\n Received <%+v>", utils.ErrServiceAlreadyRunning, err)
|
||||
}
|
||||
time.Sleep(10 * time.Millisecond)
|
||||
err = srv.Reload()
|
||||
if err != nil {
|
||||
t.Errorf("\nExpecting <nil>,\n Received <%+v>", err)
|
||||
}
|
||||
time.Sleep(10 * time.Millisecond)
|
||||
for _, v := range cfg.LoaderCfg() {
|
||||
v.Enabled = false
|
||||
}
|
||||
time.Sleep(10 * time.Millisecond)
|
||||
cfg.GetReloadChan(config.LoaderJson) <- struct{}{}
|
||||
time.Sleep(10 * time.Millisecond)
|
||||
|
||||
if srv.IsRunning() {
|
||||
t.Errorf("Expected service to be down")
|
||||
}
|
||||
|
||||
shdChan.CloseOnce()
|
||||
time.Sleep(10 * time.Millisecond)
|
||||
testCleanupFiles(t)
|
||||
}
|
||||
func testCleanupFiles(t *testing.T) {
|
||||
for _, dir := range []string{"/tmp/In", "/tmp/Out", "/tmp/LoaderIn", "/tmp/SubpathWithoutMove",
|
||||
"/tmp/SubpathLoaderWithMove", "/tmp/SubpathOut"} {
|
||||
if err := os.RemoveAll(dir); err != nil {
|
||||
t.Fatal("Error removing folder: ", dir, err)
|
||||
}
|
||||
}
|
||||
}
|
||||
|
||||
func TestLoaderSReload2(t *testing.T) {
|
||||
cfg := config.NewDefaultCGRConfig()
|
||||
for _, ld := range cfg.LoaderCfg() {
|
||||
ld.Enabled = false
|
||||
}
|
||||
shdChan := utils.NewSyncedChan()
|
||||
filterSChan := make(chan *engine.FilterS, 1)
|
||||
filterSChan <- nil
|
||||
server := cores.NewServer(nil)
|
||||
srvDep := map[string]*sync.WaitGroup{utils.DataDB: new(sync.WaitGroup)}
|
||||
db := NewDataDBService(cfg, nil, false, srvDep)
|
||||
db.dbchan <- new(engine.DataManager)
|
||||
anz := NewAnalyzerService(cfg, server, filterSChan, shdChan, make(chan birpc.ClientConnector, 1), srvDep)
|
||||
srv := NewLoaderService(cfg, db, filterSChan,
|
||||
server, make(chan birpc.ClientConnector, 1),
|
||||
nil, anz, srvDep)
|
||||
err := srv.Start()
|
||||
if err != nil {
|
||||
t.Fatal(err)
|
||||
}
|
||||
}
|
||||
|
||||
func TestLoaderSReload3(t *testing.T) {
|
||||
cfg := config.NewDefaultCGRConfig()
|
||||
for _, ld := range cfg.LoaderCfg() {
|
||||
ld.Enabled = false
|
||||
}
|
||||
cfg.LoaderCfg()[0].Enabled = true
|
||||
cfg.LoaderCfg()[0].TpInDir = "/tmp/TestLoaderSReload3"
|
||||
cfg.LoaderCfg()[0].RunDelay = -1
|
||||
shdChan := utils.NewSyncedChan()
|
||||
filterSChan := make(chan *engine.FilterS, 1)
|
||||
filterSChan <- nil
|
||||
server := cores.NewServer(nil)
|
||||
srvDep := map[string]*sync.WaitGroup{utils.DataDB: new(sync.WaitGroup)}
|
||||
db := NewDataDBService(cfg, nil, false, srvDep)
|
||||
db.dbchan <- new(engine.DataManager)
|
||||
anz := NewAnalyzerService(cfg, server, filterSChan, shdChan, make(chan birpc.ClientConnector, 1), srvDep)
|
||||
srv := NewLoaderService(cfg, db, filterSChan,
|
||||
server, make(chan birpc.ClientConnector, 1),
|
||||
nil, anz, srvDep)
|
||||
err := srv.Start()
|
||||
if err == nil || err.Error() != "no such file or directory" {
|
||||
t.Errorf("\nExpected <%+v>, \nReceived <%+v>", "no such file or directory", err)
|
||||
}
|
||||
err = srv.Reload()
|
||||
if err == nil || err.Error() != "no such file or directory" {
|
||||
t.Errorf("\nExpected <%+v>, \nReceived <%+v>", "no such file or directory", err)
|
||||
}
|
||||
}
|
||||
@@ -1,97 +0,0 @@
|
||||
/*
|
||||
Real-time Online/Offline Charging System (OCS) for Telecom & ISP environments
|
||||
Copyright (C) ITsysCOM GmbH
|
||||
|
||||
This program is free software: you can redistribute it and/or modify
|
||||
it under the terms of the GNU Affero General Public License as published by
|
||||
the Free Software Foundation, either version 3 of the License, or
|
||||
(at your option) any later version.
|
||||
|
||||
This program is distributed in the hope that it will be useful,
|
||||
but WITHOUT ANY WARRANTY; without even the implied warranty of
|
||||
MERCHANTABILITY or FITNESS FOR A PARTICULAR PURPOSE. See the
|
||||
GNU Affero General Public License for more details.
|
||||
|
||||
You should have received a copy of the GNU Affero General Public License
|
||||
along with this program. If not, see <https://www.gnu.org/licenses/>
|
||||
*/
|
||||
package services
|
||||
|
||||
import (
|
||||
"reflect"
|
||||
"sync"
|
||||
"testing"
|
||||
|
||||
"github.com/cgrates/birpc"
|
||||
"github.com/cgrates/cgrates/loaders"
|
||||
|
||||
"github.com/cgrates/cgrates/config"
|
||||
"github.com/cgrates/cgrates/cores"
|
||||
"github.com/cgrates/cgrates/engine"
|
||||
"github.com/cgrates/cgrates/utils"
|
||||
)
|
||||
|
||||
// TestLoaderSCoverage for cover testing
|
||||
func TestLoaderSCoverage(t *testing.T) {
|
||||
cfg := config.NewDefaultCGRConfig()
|
||||
shdChan := utils.NewSyncedChan()
|
||||
filterSChan := make(chan *engine.FilterS, 1)
|
||||
filterSChan <- nil
|
||||
server := cores.NewServer(nil)
|
||||
srvDep := map[string]*sync.WaitGroup{utils.DataDB: new(sync.WaitGroup)}
|
||||
db := NewDataDBService(cfg, nil, false, srvDep)
|
||||
internalLoaderSChan := make(chan birpc.ClientConnector, 1)
|
||||
rpcInternal := map[string]chan birpc.ClientConnector{}
|
||||
cM := engine.NewConnManager(cfg, rpcInternal)
|
||||
anz := NewAnalyzerService(cfg, server, filterSChan, shdChan, make(chan birpc.ClientConnector, 1), srvDep)
|
||||
srv := NewLoaderService(cfg, db,
|
||||
filterSChan, server, internalLoaderSChan,
|
||||
cM, anz, srvDep)
|
||||
if srv == nil {
|
||||
t.Errorf("\nExpecting <nil>,\n Received <%+v>", utils.ToJSON(srv))
|
||||
}
|
||||
if srv.IsRunning() {
|
||||
t.Errorf("Expected service to be down")
|
||||
}
|
||||
srv.ldrs = loaders.NewLoaderService(&engine.DataManager{},
|
||||
[]*config.LoaderSCfg{{
|
||||
ID: "test_id",
|
||||
Enabled: true,
|
||||
Tenant: "",
|
||||
DryRun: false,
|
||||
RunDelay: 0,
|
||||
LockFilePath: "",
|
||||
CacheSConns: nil,
|
||||
FieldSeparator: "",
|
||||
TpInDir: "",
|
||||
TpOutDir: "",
|
||||
Data: nil,
|
||||
}}, "", 0,
|
||||
&engine.FilterS{}, nil)
|
||||
if !srv.IsRunning() {
|
||||
t.Errorf("Expected service to be running")
|
||||
}
|
||||
serviceName := srv.ServiceName()
|
||||
if !reflect.DeepEqual(serviceName, utils.LoaderS) {
|
||||
t.Errorf("\nExpecting <%+v>,\n Received <%+v>", utils.LoaderS, serviceName)
|
||||
}
|
||||
shouldRun := srv.ShouldRun()
|
||||
if !reflect.DeepEqual(shouldRun, false) {
|
||||
t.Errorf("\nExpecting <false>,\n Received <%+v>", shouldRun)
|
||||
}
|
||||
if !reflect.DeepEqual(srv.GetLoaderS(), srv.ldrs) {
|
||||
t.Errorf("\nExpecting <%+v>,\n Received <%+v>", srv.ldrs, srv.GetLoaderS())
|
||||
}
|
||||
srv.stopChan = make(chan struct{}, 1)
|
||||
chS := engine.NewCacheS(cfg, nil, nil)
|
||||
cacheSrv, err := engine.NewService(chS)
|
||||
if err != nil {
|
||||
t.Fatal(err)
|
||||
}
|
||||
srv.connChan <- cacheSrv
|
||||
srv.Shutdown()
|
||||
if srv.IsRunning() {
|
||||
t.Errorf("Expected service to be down")
|
||||
}
|
||||
|
||||
}
|
||||
@@ -69,8 +69,7 @@ func TestRadiusAgentReloadStartShut(t *testing.T) {
|
||||
anz := NewAnalyzerService(cfg, server, filterSChan, shdChan, make(chan birpc.ClientConnector, 1), srvDep)
|
||||
sS := NewSessionService(cfg, db, server, make(chan birpc.ClientConnector, 1),
|
||||
shdChan, nil, anz, srvDep)
|
||||
srvMngr.AddServices(srv, sS,
|
||||
NewLoaderService(cfg, db, filterSChan, server, make(chan birpc.ClientConnector, 1), nil, anz, srvDep), db)
|
||||
srvMngr.AddServices(srv, sS, db)
|
||||
runtime.Gosched()
|
||||
time.Sleep(10 * time.Millisecond) //need to switch to gorutine
|
||||
if err := srv.Shutdown(); err != nil {
|
||||
@@ -126,8 +125,7 @@ func TestRadiusAgentReload1(t *testing.T) {
|
||||
shdChan, nil, anz, srvDep)
|
||||
srv := NewRadiusAgent(cfg, filterSChan, shdChan, nil, nil, srvDep)
|
||||
engine.NewConnManager(cfg, nil)
|
||||
srvMngr.AddServices(srv, sS,
|
||||
NewLoaderService(cfg, db, filterSChan, server, make(chan birpc.ClientConnector, 1), nil, anz, srvDep), db)
|
||||
srvMngr.AddServices(srv, sS, db)
|
||||
if err := srvMngr.StartServices(); err != nil {
|
||||
t.Fatal(err)
|
||||
}
|
||||
@@ -197,8 +195,7 @@ func TestRadiusAgentReload2(t *testing.T) {
|
||||
shdChan, nil, anz, srvDep)
|
||||
srv := NewRadiusAgent(cfg, filterSChan, shdChan, nil, nil, srvDep)
|
||||
engine.NewConnManager(cfg, nil)
|
||||
srvMngr.AddServices(srv, sS,
|
||||
NewLoaderService(cfg, db, filterSChan, server, make(chan birpc.ClientConnector, 1), nil, anz, srvDep), db)
|
||||
srvMngr.AddServices(srv, sS, db)
|
||||
if err := srvMngr.StartServices(); err != nil {
|
||||
t.Fatal(err)
|
||||
}
|
||||
|
||||
@@ -74,8 +74,7 @@ func TestRalsReload(t *testing.T) {
|
||||
make(chan birpc.ClientConnector, 1),
|
||||
make(chan birpc.ClientConnector, 1),
|
||||
shdChan, nil, anz, srvDep, filterSChan)
|
||||
srvMngr.AddServices(ralS, schS, tS,
|
||||
NewLoaderService(cfg, db, filterSChan, server, make(chan birpc.ClientConnector, 1), nil, anz, srvDep), db, stordb)
|
||||
srvMngr.AddServices(ralS, schS, tS, db, stordb)
|
||||
if err := srvMngr.StartServices(); err != nil {
|
||||
t.Error(err)
|
||||
}
|
||||
|
||||
@@ -60,9 +60,7 @@ func TestDispatcherHReload(t *testing.T) {
|
||||
anz := NewAnalyzerService(cfg, server, filterSChan, shdChan, make(chan birpc.ClientConnector, 1), srvDep)
|
||||
connMngr := engine.NewConnManager(cfg, nil)
|
||||
srv := NewRegistrarCService(cfg, server, connMngr, anz, srvDep)
|
||||
srvMngr.AddServices(srv,
|
||||
NewLoaderService(cfg, db, filterSChan, server,
|
||||
make(chan birpc.ClientConnector, 1), nil, anz, srvDep), db)
|
||||
srvMngr.AddServices(srv, db)
|
||||
if err := srvMngr.StartServices(); err != nil {
|
||||
t.Fatal(err)
|
||||
}
|
||||
|
||||
@@ -61,8 +61,7 @@ func TestResourceSReload(t *testing.T) {
|
||||
tS := NewThresholdService(cfg, db, chS, filterSChan, server, make(chan birpc.ClientConnector, 1), nil, anz, srvDep)
|
||||
reS := NewResourceService(cfg, db, chS, filterSChan, server, make(chan birpc.ClientConnector, 1), nil, anz, srvDep)
|
||||
engine.NewConnManager(cfg, nil)
|
||||
srvMngr.AddServices(tS, reS,
|
||||
NewLoaderService(cfg, db, filterSChan, server, make(chan birpc.ClientConnector, 1), nil, anz, srvDep), db)
|
||||
srvMngr.AddServices(tS, reS, db)
|
||||
if err := srvMngr.StartServices(); err != nil {
|
||||
t.Error(err)
|
||||
}
|
||||
|
||||
@@ -55,8 +55,7 @@ func TestRouteSReload(t *testing.T) {
|
||||
db := NewDataDBService(cfg, nil, false, srvDep)
|
||||
routeS := NewRouteService(cfg, db, chS, filterSChan, server, make(chan birpc.ClientConnector, 1), nil, anz, srvDep)
|
||||
engine.NewConnManager(cfg, nil)
|
||||
srvMngr.AddServices(routeS,
|
||||
NewLoaderService(cfg, db, filterSChan, server, make(chan birpc.ClientConnector, 1), nil, anz, srvDep), db)
|
||||
srvMngr.AddServices(routeS, db)
|
||||
if err := srvMngr.StartServices(); err != nil {
|
||||
t.Error(err)
|
||||
}
|
||||
|
||||
@@ -54,8 +54,7 @@ func TestSchedulerSReload(t *testing.T) {
|
||||
anz := NewAnalyzerService(cfg, server, filterSChan, shdChan, make(chan birpc.ClientConnector, 1), srvDep)
|
||||
schS := NewSchedulerService(cfg, db, chS, filterSChan, server, make(chan birpc.ClientConnector, 1), nil, anz, srvDep)
|
||||
engine.NewConnManager(cfg, nil)
|
||||
srvMngr.AddServices(schS,
|
||||
NewLoaderService(cfg, db, filterSChan, server, make(chan birpc.ClientConnector, 1), nil, anz, srvDep), db)
|
||||
srvMngr.AddServices(schS, db)
|
||||
if err := srvMngr.StartServices(); err != nil {
|
||||
t.Error(err)
|
||||
}
|
||||
|
||||
@@ -64,8 +64,7 @@ func TestSIPAgentReload(t *testing.T) {
|
||||
shdChan, nil, anz, srvDep)
|
||||
srv := NewSIPAgent(cfg, filterSChan, shdChan, nil, srvDep)
|
||||
engine.NewConnManager(cfg, nil)
|
||||
srvMngr.AddServices(srv, sS,
|
||||
NewLoaderService(cfg, db, filterSChan, server, make(chan birpc.ClientConnector, 1), nil, anz, srvDep), db)
|
||||
srvMngr.AddServices(srv, sS, db)
|
||||
if err := srvMngr.StartServices(); err != nil {
|
||||
t.Fatal(err)
|
||||
}
|
||||
|
||||
@@ -61,8 +61,7 @@ func TestStatSReload(t *testing.T) {
|
||||
tS := NewThresholdService(cfg, db, chS, filterSChan, server, make(chan birpc.ClientConnector, 1), nil, anz, srvDep)
|
||||
sS := NewStatService(cfg, db, chS, filterSChan, server, make(chan birpc.ClientConnector, 1), nil, anz, srvDep)
|
||||
engine.NewConnManager(cfg, nil)
|
||||
srvMngr.AddServices(tS, sS,
|
||||
NewLoaderService(cfg, db, filterSChan, server, make(chan birpc.ClientConnector, 1), nil, anz, srvDep), db)
|
||||
srvMngr.AddServices(tS, sS, db)
|
||||
if err := srvMngr.StartServices(); err != nil {
|
||||
t.Error(err)
|
||||
}
|
||||
|
||||
@@ -61,9 +61,7 @@ func TestStorDBReload(t *testing.T) {
|
||||
cdrsRPC := make(chan birpc.ClientConnector, 1)
|
||||
cdrS := NewCDRServer(cfg, db, stordb, filterSChan, server,
|
||||
cdrsRPC, nil, anz, srvDep)
|
||||
srvMngr.AddServices(cdrS, ralS, schS, chrS,
|
||||
NewLoaderService(cfg, db, filterSChan, server,
|
||||
make(chan birpc.ClientConnector, 1), nil, anz, srvDep), db, stordb)
|
||||
srvMngr.AddServices(cdrS, ralS, schS, chrS, db, stordb)
|
||||
if err := engine.InitStorDb(cfg); err != nil {
|
||||
t.Fatal(err)
|
||||
}
|
||||
|
||||
@@ -57,8 +57,7 @@ func TestThresholdSReload(t *testing.T) {
|
||||
db := NewDataDBService(cfg, nil, false, srvDep)
|
||||
tS := NewThresholdService(cfg, db, chS, filterSChan, server, make(chan birpc.ClientConnector, 1), nil, anz, srvDep)
|
||||
engine.NewConnManager(cfg, nil)
|
||||
srvMngr.AddServices(tS,
|
||||
NewLoaderService(cfg, db, filterSChan, server, make(chan birpc.ClientConnector, 1), nil, anz, srvDep), db)
|
||||
srvMngr.AddServices(tS, db)
|
||||
if err := srvMngr.StartServices(); err != nil {
|
||||
t.Error(err)
|
||||
}
|
||||
@@ -127,8 +126,7 @@ func TestThresholdSReload2(t *testing.T) {
|
||||
db := NewDataDBService(cfg, nil, false, srvDep)
|
||||
tS := NewThresholdService(cfg, db, chS, filterSChan, server, make(chan birpc.ClientConnector, 1), nil, anz, srvDep)
|
||||
engine.NewConnManager(cfg, nil)
|
||||
srvMngr.AddServices(tS,
|
||||
NewLoaderService(cfg, db, filterSChan, server, make(chan birpc.ClientConnector, 1), nil, anz, srvDep), db)
|
||||
srvMngr.AddServices(tS, db)
|
||||
if err := srvMngr.StartServices(); err != nil {
|
||||
t.Error(err)
|
||||
}
|
||||
|
||||
Reference in New Issue
Block a user