Tests in services

This commit is contained in:
root
2021-02-16 10:23:06 +02:00
committed by Dan Christian Bogos
parent 4a032177c3
commit dab5897a03
9 changed files with 298 additions and 55 deletions

View File

@@ -33,13 +33,13 @@ import (
// NewDispatcherService constructs a DispatcherService
func NewDispatcherService(dm *engine.DataManager,
cfg *config.CGRConfig, fltrS *engine.FilterS,
connMgr *engine.ConnManager) (*DispatcherService, error) {
connMgr *engine.ConnManager) *DispatcherService {
return &DispatcherService{
dm: dm,
cfg: cfg,
fltrS: fltrS,
connMgr: connMgr,
}, nil
}
}
// DispatcherService is the service handling dispatching towards internal components

View File

@@ -136,14 +136,14 @@ func (ts Thresholds) Sort() {
}
// NewThresholdService the constructor for ThresoldS service
func NewThresholdService(dm *DataManager, cgrcfg *config.CGRConfig, filterS *FilterS) (tS *ThresholdService, err error) {
func NewThresholdService(dm *DataManager, cgrcfg *config.CGRConfig, filterS *FilterS) (tS *ThresholdService) {
return &ThresholdService{dm: dm,
cgrcfg: cgrcfg,
filterS: filterS,
stopBackup: make(chan struct{}),
loopStoped: make(chan struct{}),
storedTdIDs: make(utils.StringSet),
}, nil
}
}
// ThresholdService manages Threshold execution and storing them to dataDB

View File

@@ -293,7 +293,7 @@ func TestThresholdsmatchingThresholdsForEvent(t *testing.T) {
defaultCfg.ThresholdSCfg().StoreInterval = 0
defaultCfg.ThresholdSCfg().StringIndexedFields = nil
defaultCfg.ThresholdSCfg().PrefixIndexedFields = nil
thServ, err = NewThresholdService(dmTH, defaultCfg, &FilterS{dm: dmTH, cfg: defaultCfg})
thServ = NewThresholdService(dmTH, defaultCfg, &FilterS{dm: dmTH, cfg: defaultCfg})
if err != nil {
t.Errorf("Error: %+v", err)
}
@@ -503,7 +503,7 @@ func TestThresholdsProcessEvent(t *testing.T) {
defaultCfg.ThresholdSCfg().StoreInterval = 0
defaultCfg.ThresholdSCfg().StringIndexedFields = nil
defaultCfg.ThresholdSCfg().PrefixIndexedFields = nil
thServ, err = NewThresholdService(dmTH, defaultCfg, &FilterS{dm: dmTH, cfg: defaultCfg})
thServ = NewThresholdService(dmTH, defaultCfg, &FilterS{dm: dmTH, cfg: defaultCfg})
if err != nil {
t.Errorf("Error: %+v", err)
}
@@ -704,7 +704,7 @@ func TestThresholdsVerifyIfExecuted(t *testing.T) {
defaultCfg.ThresholdSCfg().StoreInterval = 0
defaultCfg.ThresholdSCfg().StringIndexedFields = nil
defaultCfg.ThresholdSCfg().PrefixIndexedFields = nil
thServ, err = NewThresholdService(dmTH, defaultCfg, &FilterS{dm: dmTH, cfg: defaultCfg})
thServ = NewThresholdService(dmTH, defaultCfg, &FilterS{dm: dmTH, cfg: defaultCfg})
if err != nil {
t.Errorf("Error: %+v", err)
}
@@ -925,7 +925,7 @@ func TestThresholdsProcessEvent2(t *testing.T) {
defaultCfg.ThresholdSCfg().StoreInterval = 0
defaultCfg.ThresholdSCfg().StringIndexedFields = nil
defaultCfg.ThresholdSCfg().PrefixIndexedFields = nil
thServ, err = NewThresholdService(dmTH, defaultCfg, &FilterS{dm: dmTH, cfg: defaultCfg})
thServ = NewThresholdService(dmTH, defaultCfg, &FilterS{dm: dmTH, cfg: defaultCfg})
if err != nil {
t.Errorf("Error: %+v", err)
}

View File

@@ -76,6 +76,7 @@ func (db *DataDBService) Start() (err error) {
db.dm = engine.NewDataManager(d, db.cfg.CacheCfg(), db.connMgr)
engine.SetDataStorage(db.dm)
if err = engine.CheckVersions(db.dm.DataDB()); err != nil {
fmt.Println(err)
return
}
db.dbchan <- db.dm

View File

@@ -100,39 +100,3 @@ func TestDiameterAgentReload1(t *testing.T) {
shdChan.CloseOnce()
time.Sleep(10 * time.Millisecond)
}
/*
func TestDiameterAgentReload2(t *testing.T) {
cfg := config.NewDefaultCGRConfig()
cfg.SessionSCfg().Enabled = true
utils.Logger, _ = utils.Newlogger(utils.MetaSysLog, cfg.GeneralCfg().NodeID)
utils.Logger.SetLogLevel(7)
filterSChan := make(chan *engine.FilterS, 1)
filterSChan <- nil
shdChan := utils.NewSyncedChan()
chS := engine.NewCacheS(cfg, nil, nil)
cacheSChan := make(chan rpcclient.ClientConnector, 1)
cacheSChan <- chS
srvDep := map[string]*sync.WaitGroup{utils.DataDB: new(sync.WaitGroup)}
srv := NewDiameterAgent(cfg, filterSChan, shdChan, nil, srvDep)
engine.NewConnManager(cfg, nil)
if srv.IsRunning() {
t.Errorf("Expected service to be down")
}
cfg.DiameterAgentCfg().ListenNet = "bad"
srv.Start()
time.Sleep(10 * time.Millisecond) //need to switch to gorutine
time.Sleep(10 * time.Millisecond)
cfg.DiameterAgentCfg().Enabled = false
cfg.GetReloadChan(config.DA_JSN) <- struct{}{}
time.Sleep(10 * time.Millisecond)
if srv.IsRunning() {
t.Errorf("Expected service to be down")
}
time.Sleep(10 * time.Millisecond)
}
*/

View File

@@ -19,7 +19,6 @@ along with this program. If not, see <http://www.gnu.org/licenses/>
package services
import (
"fmt"
"sync"
v1 "github.com/cgrates/cgrates/apier/v1"
@@ -87,10 +86,7 @@ func (dspS *DispatcherService) Start() (err error) {
dspS.Lock()
defer dspS.Unlock()
if dspS.dspS, err = dispatchers.NewDispatcherService(datadb, dspS.cfg, fltrS, dspS.connMgr); err != nil {
utils.Logger.Crit(fmt.Sprintf("<%s> Could not init, error: %s", utils.DispatcherS, err.Error()))
return
}
dspS.dspS = dispatchers.NewDispatcherService(datadb, dspS.cfg, fltrS, dspS.connMgr)
// for the moment we dispable Apier through dispatcher
// until we figured out a better sollution in case of gob server

View File

@@ -805,3 +805,288 @@ func TestStorDBReload8(t *testing.T) {
shdChan.CloseOnce()
time.Sleep(10 * time.Millisecond)
}
func TestStorDBReloadVersion1(t *testing.T) {
cfg, err := config.NewCGRConfigFromPath(path.Join("/usr", "share", "cgrates", "conf", "samples", "tutmongo"))
if err != nil {
t.Fatal(err)
}
storageDb, err := engine.NewStorDBConn(cfg.StorDbCfg().Type,
cfg.StorDbCfg().Host, cfg.StorDbCfg().Port,
cfg.StorDbCfg().Name, cfg.StorDbCfg().User,
cfg.StorDbCfg().Password, cfg.GeneralCfg().DBDataEncoding,
cfg.StorDbCfg().StringIndexedFields, cfg.StorDbCfg().PrefixIndexedFields,
cfg.StorDbCfg().Opts)
if err != nil {
t.Fatal(err)
}
defer func() {
storageDb.Flush("")
storageDb.Close()
}()
err = storageDb.SetVersions(engine.Versions{
utils.CostDetails: 2,
utils.SessionSCosts: 3,
//old version for CDRs
utils.CDRs: 1,
utils.TpRatingPlans: 1,
utils.TpFilters: 1,
utils.TpDestinationRates: 1,
utils.TpActionTriggers: 1,
utils.TpAccountActionsV: 1,
utils.TpActionPlans: 1,
utils.TpActions: 1,
utils.TpThresholds: 1,
utils.TpRoutes: 1,
utils.TpStats: 1,
utils.TpSharedGroups: 1,
utils.TpRatingProfiles: 1,
utils.TpResources: 1,
utils.TpRates: 1,
utils.TpTiming: 1,
utils.TpResource: 1,
utils.TpDestinations: 1,
utils.TpRatingPlan: 1,
utils.TpRatingProfile: 1,
utils.TpChargers: 1,
utils.TpDispatchers: 1,
utils.TpRateProfiles: 1,
utils.TpActionProfiles: 1,
}, true)
utils.Logger, _ = utils.Newlogger(utils.MetaSysLog, cfg.GeneralCfg().NodeID)
utils.Logger.SetLogLevel(7)
filterSChan := make(chan *engine.FilterS, 1)
filterSChan <- nil
shdChan := utils.NewSyncedChan()
shdWg := new(sync.WaitGroup)
chS := engine.NewCacheS(cfg, nil, nil)
cfg.ChargerSCfg().Enabled = true
server := cores.NewServer(nil)
srvMngr := servmanager.NewServiceManager(cfg, shdChan, shdWg)
srvDep := map[string]*sync.WaitGroup{utils.DataDB: new(sync.WaitGroup)}
db := NewDataDBService(cfg, nil, srvDep)
cfg.StorDbCfg().Password = "CGRateS.org"
stordb := NewStorDBService(cfg, srvDep)
stordb.oldDBCfg = cfg.StorDbCfg().Clone()
anz := NewAnalyzerService(cfg, server, filterSChan, shdChan, make(chan rpcclient.ClientConnector, 1), srvDep)
chrS := NewChargerService(cfg, db, chS, filterSChan, server, make(chan rpcclient.ClientConnector, 1), nil, anz, srvDep)
schS := NewSchedulerService(cfg, db, chS, filterSChan, server, make(chan rpcclient.ClientConnector, 1), nil, anz, srvDep)
ralS := NewRalService(cfg, chS, server,
make(chan rpcclient.ClientConnector, 1),
make(chan rpcclient.ClientConnector, 1),
shdChan, nil, anz, srvDep)
cdrsRPC := make(chan rpcclient.ClientConnector, 1)
cdrS := NewCDRServer(cfg, db, stordb, filterSChan, server,
cdrsRPC, nil, anz, srvDep)
srvMngr.AddServices(cdrS, ralS, schS, chrS,
NewLoaderService(cfg, db, filterSChan, server,
make(chan rpcclient.ClientConnector, 1), nil, anz, srvDep), db, stordb)
if err := srvMngr.StartServices(); err != nil {
t.Error(err)
}
stordb.db = nil
err = stordb.Reload()
if err == nil || err.Error() != "can't conver StorDB of type mongo to MongoStorage" {
t.Fatal(err)
}
cfg.CdrsCfg().Enabled = false
cfg.GetReloadChan(config.CDRS_JSN) <- struct{}{}
time.Sleep(10 * time.Millisecond)
shdChan.CloseOnce()
time.Sleep(10 * time.Millisecond)
}
func TestStorDBReloadVersion2(t *testing.T) {
cfg, err := config.NewCGRConfigFromPath(path.Join("/usr", "share", "cgrates", "conf", "samples", "tutmysql"))
if err != nil {
t.Fatal(err)
}
storageDb, err := engine.NewStorDBConn(cfg.StorDbCfg().Type,
cfg.StorDbCfg().Host, cfg.StorDbCfg().Port,
cfg.StorDbCfg().Name, cfg.StorDbCfg().User,
cfg.StorDbCfg().Password, cfg.GeneralCfg().DBDataEncoding,
cfg.StorDbCfg().StringIndexedFields, cfg.StorDbCfg().PrefixIndexedFields,
cfg.StorDbCfg().Opts)
if err != nil {
t.Fatal(err)
}
defer func() {
storageDb.Flush("")
storageDb.Close()
}()
err = storageDb.SetVersions(engine.Versions{
utils.CostDetails: 2,
utils.SessionSCosts: 3,
//old version for CDRs
utils.CDRs: 1,
utils.TpRatingPlans: 1,
utils.TpFilters: 1,
utils.TpDestinationRates: 1,
utils.TpActionTriggers: 1,
utils.TpAccountActionsV: 1,
utils.TpActionPlans: 1,
utils.TpActions: 1,
utils.TpThresholds: 1,
utils.TpRoutes: 1,
utils.TpStats: 1,
utils.TpSharedGroups: 1,
utils.TpRatingProfiles: 1,
utils.TpResources: 1,
utils.TpRates: 1,
utils.TpTiming: 1,
utils.TpResource: 1,
utils.TpDestinations: 1,
utils.TpRatingPlan: 1,
utils.TpRatingProfile: 1,
utils.TpChargers: 1,
utils.TpDispatchers: 1,
utils.TpRateProfiles: 1,
utils.TpActionProfiles: 1,
}, true)
utils.Logger, _ = utils.Newlogger(utils.MetaSysLog, cfg.GeneralCfg().NodeID)
utils.Logger.SetLogLevel(7)
filterSChan := make(chan *engine.FilterS, 1)
filterSChan <- nil
shdChan := utils.NewSyncedChan()
shdWg := new(sync.WaitGroup)
chS := engine.NewCacheS(cfg, nil, nil)
cfg.ChargerSCfg().Enabled = true
server := cores.NewServer(nil)
srvMngr := servmanager.NewServiceManager(cfg, shdChan, shdWg)
srvDep := map[string]*sync.WaitGroup{utils.DataDB: new(sync.WaitGroup)}
db := NewDataDBService(cfg, nil, srvDep)
cfg.StorDbCfg().Password = "CGRateS.org"
stordb := NewStorDBService(cfg, srvDep)
stordb.oldDBCfg = cfg.StorDbCfg().Clone()
anz := NewAnalyzerService(cfg, server, filterSChan, shdChan, make(chan rpcclient.ClientConnector, 1), srvDep)
chrS := NewChargerService(cfg, db, chS, filterSChan, server, make(chan rpcclient.ClientConnector, 1), nil, anz, srvDep)
schS := NewSchedulerService(cfg, db, chS, filterSChan, server, make(chan rpcclient.ClientConnector, 1), nil, anz, srvDep)
ralS := NewRalService(cfg, chS, server,
make(chan rpcclient.ClientConnector, 1),
make(chan rpcclient.ClientConnector, 1),
shdChan, nil, anz, srvDep)
cdrsRPC := make(chan rpcclient.ClientConnector, 1)
cdrS := NewCDRServer(cfg, db, stordb, filterSChan, server,
cdrsRPC, nil, anz, srvDep)
srvMngr.AddServices(cdrS, ralS, schS, chrS,
NewLoaderService(cfg, db, filterSChan, server,
make(chan rpcclient.ClientConnector, 1), nil, anz, srvDep), db, stordb)
if err := srvMngr.StartServices(); err != nil {
t.Error(err)
}
stordb.db = nil
err = stordb.Reload()
if err == nil || err.Error() != "can't conver StorDB of type mysql to SQLStorage" {
t.Fatal(err)
}
cfg.CdrsCfg().Enabled = false
cfg.GetReloadChan(config.CDRS_JSN) <- struct{}{}
time.Sleep(10 * time.Millisecond)
shdChan.CloseOnce()
time.Sleep(10 * time.Millisecond)
}
func TestStorDBReloadVersion3(t *testing.T) {
cfg, err := config.NewCGRConfigFromPath(path.Join("/usr", "share", "cgrates", "conf", "samples", "tutinternal"))
if err != nil {
t.Fatal(err)
}
storageDb, err := engine.NewStorDBConn(cfg.StorDbCfg().Type,
cfg.StorDbCfg().Host, cfg.StorDbCfg().Port,
cfg.StorDbCfg().Name, cfg.StorDbCfg().User,
cfg.StorDbCfg().Password, cfg.GeneralCfg().DBDataEncoding,
cfg.StorDbCfg().StringIndexedFields, cfg.StorDbCfg().PrefixIndexedFields,
cfg.StorDbCfg().Opts)
if err != nil {
t.Fatal(err)
}
defer func() {
storageDb.Flush("")
storageDb.Close()
}()
err = storageDb.SetVersions(engine.Versions{
utils.CostDetails: 2,
utils.SessionSCosts: 3,
//old version for CDRs
utils.CDRs: 1,
utils.TpRatingPlans: 1,
utils.TpFilters: 1,
utils.TpDestinationRates: 1,
utils.TpActionTriggers: 1,
utils.TpAccountActionsV: 1,
utils.TpActionPlans: 1,
utils.TpActions: 1,
utils.TpThresholds: 1,
utils.TpRoutes: 1,
utils.TpStats: 1,
utils.TpSharedGroups: 1,
utils.TpRatingProfiles: 1,
utils.TpResources: 1,
utils.TpRates: 1,
utils.TpTiming: 1,
utils.TpResource: 1,
utils.TpDestinations: 1,
utils.TpRatingPlan: 1,
utils.TpRatingProfile: 1,
utils.TpChargers: 1,
utils.TpDispatchers: 1,
utils.TpRateProfiles: 1,
utils.TpActionProfiles: 1,
}, true)
utils.Logger, _ = utils.Newlogger(utils.MetaSysLog, cfg.GeneralCfg().NodeID)
utils.Logger.SetLogLevel(7)
filterSChan := make(chan *engine.FilterS, 1)
filterSChan <- nil
shdChan := utils.NewSyncedChan()
shdWg := new(sync.WaitGroup)
chS := engine.NewCacheS(cfg, nil, nil)
cfg.ChargerSCfg().Enabled = true
server := cores.NewServer(nil)
srvMngr := servmanager.NewServiceManager(cfg, shdChan, shdWg)
srvDep := map[string]*sync.WaitGroup{utils.DataDB: new(sync.WaitGroup)}
db := NewDataDBService(cfg, nil, srvDep)
cfg.StorDbCfg().Password = "CGRateS.org"
stordb := NewStorDBService(cfg, srvDep)
stordb.oldDBCfg = cfg.StorDbCfg().Clone()
anz := NewAnalyzerService(cfg, server, filterSChan, shdChan, make(chan rpcclient.ClientConnector, 1), srvDep)
chrS := NewChargerService(cfg, db, chS, filterSChan, server, make(chan rpcclient.ClientConnector, 1), nil, anz, srvDep)
schS := NewSchedulerService(cfg, db, chS, filterSChan, server, make(chan rpcclient.ClientConnector, 1), nil, anz, srvDep)
ralS := NewRalService(cfg, chS, server,
make(chan rpcclient.ClientConnector, 1),
make(chan rpcclient.ClientConnector, 1),
shdChan, nil, anz, srvDep)
cdrsRPC := make(chan rpcclient.ClientConnector, 1)
cdrS := NewCDRServer(cfg, db, stordb, filterSChan, server,
cdrsRPC, nil, anz, srvDep)
srvMngr.AddServices(cdrS, ralS, schS, chrS,
NewLoaderService(cfg, db, filterSChan, server,
make(chan rpcclient.ClientConnector, 1), nil, anz, srvDep), db, stordb)
if err := srvMngr.StartServices(); err != nil {
t.Error(err)
}
stordb.db = nil
err = stordb.Reload()
if err == nil || err.Error() != "can't conver StorDB of type internal to InternalDB" {
t.Fatal(err)
}
cfg.CdrsCfg().Enabled = false
cfg.GetReloadChan(config.CDRS_JSN) <- struct{}{}
time.Sleep(10 * time.Millisecond)
shdChan.CloseOnce()
time.Sleep(10 * time.Millisecond)
}

View File

@@ -83,11 +83,8 @@ func (thrs *ThresholdService) Start() (err error) {
thrs.Lock()
defer thrs.Unlock()
thrs.thrs, err = engine.NewThresholdService(datadb, thrs.cfg, filterS)
if err != nil {
utils.Logger.Crit(fmt.Sprintf("<%s> Could not init, error: %s", utils.ThresholdS, err.Error()))
return
}
thrs.thrs = engine.NewThresholdService(datadb, thrs.cfg, filterS)
utils.Logger.Info(fmt.Sprintf("<%s> starting <%s> subsystem", utils.CoreS, utils.ThresholdS))
thrs.thrs.StartLoop()
thrs.rpc = v1.NewThresholdSv1(thrs.thrs)

View File

@@ -44,7 +44,7 @@ func TestThresholdSCoverage(t *testing.T) {
if tS.IsRunning() {
t.Errorf("Expected service to be down")
}
thrs1, _ := engine.NewThresholdService(&engine.DataManager{}, &config.CGRConfig{}, &engine.FilterS{})
thrs1 := engine.NewThresholdService(&engine.DataManager{}, &config.CGRConfig{}, &engine.FilterS{})
tS2 := &ThresholdService{
cfg: cfg,
dm: db,