From dab5897a0380e8e3425a6cb25ad8a4cd1031468f Mon Sep 17 00:00:00 2001 From: root Date: Tue, 16 Feb 2021 10:23:06 +0200 Subject: [PATCH] Tests in services --- dispatchers/dispatchers.go | 4 +- engine/thresholds.go | 4 +- engine/thresholds_test.go | 8 +- services/datadb.go | 1 + services/diameteragent_it_test.go | 36 ---- services/dispatchers.go | 6 +- services/stordb_it_test.go | 285 ++++++++++++++++++++++++++++++ services/thresholds.go | 7 +- services/thresholds_test.go | 2 +- 9 files changed, 298 insertions(+), 55 deletions(-) diff --git a/dispatchers/dispatchers.go b/dispatchers/dispatchers.go index 71b67cc6c..3c025d1b1 100755 --- a/dispatchers/dispatchers.go +++ b/dispatchers/dispatchers.go @@ -33,13 +33,13 @@ import ( // NewDispatcherService constructs a DispatcherService func NewDispatcherService(dm *engine.DataManager, cfg *config.CGRConfig, fltrS *engine.FilterS, - connMgr *engine.ConnManager) (*DispatcherService, error) { + connMgr *engine.ConnManager) *DispatcherService { return &DispatcherService{ dm: dm, cfg: cfg, fltrS: fltrS, connMgr: connMgr, - }, nil + } } // DispatcherService is the service handling dispatching towards internal components diff --git a/engine/thresholds.go b/engine/thresholds.go index 8f250f501..e3ab4bce0 100644 --- a/engine/thresholds.go +++ b/engine/thresholds.go @@ -136,14 +136,14 @@ func (ts Thresholds) Sort() { } // NewThresholdService the constructor for ThresoldS service -func NewThresholdService(dm *DataManager, cgrcfg *config.CGRConfig, filterS *FilterS) (tS *ThresholdService, err error) { +func NewThresholdService(dm *DataManager, cgrcfg *config.CGRConfig, filterS *FilterS) (tS *ThresholdService) { return &ThresholdService{dm: dm, cgrcfg: cgrcfg, filterS: filterS, stopBackup: make(chan struct{}), loopStoped: make(chan struct{}), storedTdIDs: make(utils.StringSet), - }, nil + } } // ThresholdService manages Threshold execution and storing them to dataDB diff --git a/engine/thresholds_test.go b/engine/thresholds_test.go index 5e5b80058..b808cdb95 100644 --- a/engine/thresholds_test.go +++ b/engine/thresholds_test.go @@ -293,7 +293,7 @@ func TestThresholdsmatchingThresholdsForEvent(t *testing.T) { defaultCfg.ThresholdSCfg().StoreInterval = 0 defaultCfg.ThresholdSCfg().StringIndexedFields = nil defaultCfg.ThresholdSCfg().PrefixIndexedFields = nil - thServ, err = NewThresholdService(dmTH, defaultCfg, &FilterS{dm: dmTH, cfg: defaultCfg}) + thServ = NewThresholdService(dmTH, defaultCfg, &FilterS{dm: dmTH, cfg: defaultCfg}) if err != nil { t.Errorf("Error: %+v", err) } @@ -503,7 +503,7 @@ func TestThresholdsProcessEvent(t *testing.T) { defaultCfg.ThresholdSCfg().StoreInterval = 0 defaultCfg.ThresholdSCfg().StringIndexedFields = nil defaultCfg.ThresholdSCfg().PrefixIndexedFields = nil - thServ, err = NewThresholdService(dmTH, defaultCfg, &FilterS{dm: dmTH, cfg: defaultCfg}) + thServ = NewThresholdService(dmTH, defaultCfg, &FilterS{dm: dmTH, cfg: defaultCfg}) if err != nil { t.Errorf("Error: %+v", err) } @@ -704,7 +704,7 @@ func TestThresholdsVerifyIfExecuted(t *testing.T) { defaultCfg.ThresholdSCfg().StoreInterval = 0 defaultCfg.ThresholdSCfg().StringIndexedFields = nil defaultCfg.ThresholdSCfg().PrefixIndexedFields = nil - thServ, err = NewThresholdService(dmTH, defaultCfg, &FilterS{dm: dmTH, cfg: defaultCfg}) + thServ = NewThresholdService(dmTH, defaultCfg, &FilterS{dm: dmTH, cfg: defaultCfg}) if err != nil { t.Errorf("Error: %+v", err) } @@ -925,7 +925,7 @@ func TestThresholdsProcessEvent2(t *testing.T) { defaultCfg.ThresholdSCfg().StoreInterval = 0 defaultCfg.ThresholdSCfg().StringIndexedFields = nil defaultCfg.ThresholdSCfg().PrefixIndexedFields = nil - thServ, err = NewThresholdService(dmTH, defaultCfg, &FilterS{dm: dmTH, cfg: defaultCfg}) + thServ = NewThresholdService(dmTH, defaultCfg, &FilterS{dm: dmTH, cfg: defaultCfg}) if err != nil { t.Errorf("Error: %+v", err) } diff --git a/services/datadb.go b/services/datadb.go index bdb0dcbfd..588072b6a 100644 --- a/services/datadb.go +++ b/services/datadb.go @@ -76,6 +76,7 @@ func (db *DataDBService) Start() (err error) { db.dm = engine.NewDataManager(d, db.cfg.CacheCfg(), db.connMgr) engine.SetDataStorage(db.dm) if err = engine.CheckVersions(db.dm.DataDB()); err != nil { + fmt.Println(err) return } db.dbchan <- db.dm diff --git a/services/diameteragent_it_test.go b/services/diameteragent_it_test.go index 7cf1e045d..d45346d9a 100644 --- a/services/diameteragent_it_test.go +++ b/services/diameteragent_it_test.go @@ -100,39 +100,3 @@ func TestDiameterAgentReload1(t *testing.T) { shdChan.CloseOnce() time.Sleep(10 * time.Millisecond) } - -/* -func TestDiameterAgentReload2(t *testing.T) { - cfg := config.NewDefaultCGRConfig() - cfg.SessionSCfg().Enabled = true - utils.Logger, _ = utils.Newlogger(utils.MetaSysLog, cfg.GeneralCfg().NodeID) - utils.Logger.SetLogLevel(7) - filterSChan := make(chan *engine.FilterS, 1) - filterSChan <- nil - shdChan := utils.NewSyncedChan() - chS := engine.NewCacheS(cfg, nil, nil) - cacheSChan := make(chan rpcclient.ClientConnector, 1) - cacheSChan <- chS - srvDep := map[string]*sync.WaitGroup{utils.DataDB: new(sync.WaitGroup)} - srv := NewDiameterAgent(cfg, filterSChan, shdChan, nil, srvDep) - engine.NewConnManager(cfg, nil) - if srv.IsRunning() { - t.Errorf("Expected service to be down") - } - cfg.DiameterAgentCfg().ListenNet = "bad" - srv.Start() - - time.Sleep(10 * time.Millisecond) //need to switch to gorutine - - time.Sleep(10 * time.Millisecond) - cfg.DiameterAgentCfg().Enabled = false - cfg.GetReloadChan(config.DA_JSN) <- struct{}{} - time.Sleep(10 * time.Millisecond) - - if srv.IsRunning() { - t.Errorf("Expected service to be down") - } - - time.Sleep(10 * time.Millisecond) -} -*/ diff --git a/services/dispatchers.go b/services/dispatchers.go index d2596645e..ede861874 100644 --- a/services/dispatchers.go +++ b/services/dispatchers.go @@ -19,7 +19,6 @@ along with this program. If not, see package services import ( - "fmt" "sync" v1 "github.com/cgrates/cgrates/apier/v1" @@ -87,10 +86,7 @@ func (dspS *DispatcherService) Start() (err error) { dspS.Lock() defer dspS.Unlock() - if dspS.dspS, err = dispatchers.NewDispatcherService(datadb, dspS.cfg, fltrS, dspS.connMgr); err != nil { - utils.Logger.Crit(fmt.Sprintf("<%s> Could not init, error: %s", utils.DispatcherS, err.Error())) - return - } + dspS.dspS = dispatchers.NewDispatcherService(datadb, dspS.cfg, fltrS, dspS.connMgr) // for the moment we dispable Apier through dispatcher // until we figured out a better sollution in case of gob server diff --git a/services/stordb_it_test.go b/services/stordb_it_test.go index 7f37a301c..beb3e81fa 100644 --- a/services/stordb_it_test.go +++ b/services/stordb_it_test.go @@ -805,3 +805,288 @@ func TestStorDBReload8(t *testing.T) { shdChan.CloseOnce() time.Sleep(10 * time.Millisecond) } + +func TestStorDBReloadVersion1(t *testing.T) { + cfg, err := config.NewCGRConfigFromPath(path.Join("/usr", "share", "cgrates", "conf", "samples", "tutmongo")) + if err != nil { + t.Fatal(err) + } + + storageDb, err := engine.NewStorDBConn(cfg.StorDbCfg().Type, + cfg.StorDbCfg().Host, cfg.StorDbCfg().Port, + cfg.StorDbCfg().Name, cfg.StorDbCfg().User, + cfg.StorDbCfg().Password, cfg.GeneralCfg().DBDataEncoding, + cfg.StorDbCfg().StringIndexedFields, cfg.StorDbCfg().PrefixIndexedFields, + cfg.StorDbCfg().Opts) + if err != nil { + t.Fatal(err) + } + + defer func() { + storageDb.Flush("") + storageDb.Close() + }() + + err = storageDb.SetVersions(engine.Versions{ + utils.CostDetails: 2, + utils.SessionSCosts: 3, + //old version for CDRs + utils.CDRs: 1, + utils.TpRatingPlans: 1, + utils.TpFilters: 1, + utils.TpDestinationRates: 1, + utils.TpActionTriggers: 1, + utils.TpAccountActionsV: 1, + utils.TpActionPlans: 1, + utils.TpActions: 1, + utils.TpThresholds: 1, + utils.TpRoutes: 1, + utils.TpStats: 1, + utils.TpSharedGroups: 1, + utils.TpRatingProfiles: 1, + utils.TpResources: 1, + utils.TpRates: 1, + utils.TpTiming: 1, + utils.TpResource: 1, + utils.TpDestinations: 1, + utils.TpRatingPlan: 1, + utils.TpRatingProfile: 1, + utils.TpChargers: 1, + utils.TpDispatchers: 1, + utils.TpRateProfiles: 1, + utils.TpActionProfiles: 1, + }, true) + + utils.Logger, _ = utils.Newlogger(utils.MetaSysLog, cfg.GeneralCfg().NodeID) + utils.Logger.SetLogLevel(7) + filterSChan := make(chan *engine.FilterS, 1) + filterSChan <- nil + shdChan := utils.NewSyncedChan() + shdWg := new(sync.WaitGroup) + chS := engine.NewCacheS(cfg, nil, nil) + cfg.ChargerSCfg().Enabled = true + server := cores.NewServer(nil) + srvMngr := servmanager.NewServiceManager(cfg, shdChan, shdWg) + srvDep := map[string]*sync.WaitGroup{utils.DataDB: new(sync.WaitGroup)} + db := NewDataDBService(cfg, nil, srvDep) + cfg.StorDbCfg().Password = "CGRateS.org" + stordb := NewStorDBService(cfg, srvDep) + stordb.oldDBCfg = cfg.StorDbCfg().Clone() + anz := NewAnalyzerService(cfg, server, filterSChan, shdChan, make(chan rpcclient.ClientConnector, 1), srvDep) + chrS := NewChargerService(cfg, db, chS, filterSChan, server, make(chan rpcclient.ClientConnector, 1), nil, anz, srvDep) + schS := NewSchedulerService(cfg, db, chS, filterSChan, server, make(chan rpcclient.ClientConnector, 1), nil, anz, srvDep) + ralS := NewRalService(cfg, chS, server, + make(chan rpcclient.ClientConnector, 1), + make(chan rpcclient.ClientConnector, 1), + shdChan, nil, anz, srvDep) + cdrsRPC := make(chan rpcclient.ClientConnector, 1) + cdrS := NewCDRServer(cfg, db, stordb, filterSChan, server, + cdrsRPC, nil, anz, srvDep) + srvMngr.AddServices(cdrS, ralS, schS, chrS, + NewLoaderService(cfg, db, filterSChan, server, + make(chan rpcclient.ClientConnector, 1), nil, anz, srvDep), db, stordb) + if err := srvMngr.StartServices(); err != nil { + t.Error(err) + } + stordb.db = nil + err = stordb.Reload() + if err == nil || err.Error() != "can't conver StorDB of type mongo to MongoStorage" { + t.Fatal(err) + } + + cfg.CdrsCfg().Enabled = false + cfg.GetReloadChan(config.CDRS_JSN) <- struct{}{} + time.Sleep(10 * time.Millisecond) + shdChan.CloseOnce() + time.Sleep(10 * time.Millisecond) +} + +func TestStorDBReloadVersion2(t *testing.T) { + cfg, err := config.NewCGRConfigFromPath(path.Join("/usr", "share", "cgrates", "conf", "samples", "tutmysql")) + if err != nil { + t.Fatal(err) + } + + storageDb, err := engine.NewStorDBConn(cfg.StorDbCfg().Type, + cfg.StorDbCfg().Host, cfg.StorDbCfg().Port, + cfg.StorDbCfg().Name, cfg.StorDbCfg().User, + cfg.StorDbCfg().Password, cfg.GeneralCfg().DBDataEncoding, + cfg.StorDbCfg().StringIndexedFields, cfg.StorDbCfg().PrefixIndexedFields, + cfg.StorDbCfg().Opts) + if err != nil { + t.Fatal(err) + } + + defer func() { + storageDb.Flush("") + storageDb.Close() + }() + + err = storageDb.SetVersions(engine.Versions{ + utils.CostDetails: 2, + utils.SessionSCosts: 3, + //old version for CDRs + utils.CDRs: 1, + utils.TpRatingPlans: 1, + utils.TpFilters: 1, + utils.TpDestinationRates: 1, + utils.TpActionTriggers: 1, + utils.TpAccountActionsV: 1, + utils.TpActionPlans: 1, + utils.TpActions: 1, + utils.TpThresholds: 1, + utils.TpRoutes: 1, + utils.TpStats: 1, + utils.TpSharedGroups: 1, + utils.TpRatingProfiles: 1, + utils.TpResources: 1, + utils.TpRates: 1, + utils.TpTiming: 1, + utils.TpResource: 1, + utils.TpDestinations: 1, + utils.TpRatingPlan: 1, + utils.TpRatingProfile: 1, + utils.TpChargers: 1, + utils.TpDispatchers: 1, + utils.TpRateProfiles: 1, + utils.TpActionProfiles: 1, + }, true) + + utils.Logger, _ = utils.Newlogger(utils.MetaSysLog, cfg.GeneralCfg().NodeID) + utils.Logger.SetLogLevel(7) + filterSChan := make(chan *engine.FilterS, 1) + filterSChan <- nil + shdChan := utils.NewSyncedChan() + shdWg := new(sync.WaitGroup) + chS := engine.NewCacheS(cfg, nil, nil) + cfg.ChargerSCfg().Enabled = true + server := cores.NewServer(nil) + srvMngr := servmanager.NewServiceManager(cfg, shdChan, shdWg) + srvDep := map[string]*sync.WaitGroup{utils.DataDB: new(sync.WaitGroup)} + db := NewDataDBService(cfg, nil, srvDep) + cfg.StorDbCfg().Password = "CGRateS.org" + stordb := NewStorDBService(cfg, srvDep) + stordb.oldDBCfg = cfg.StorDbCfg().Clone() + anz := NewAnalyzerService(cfg, server, filterSChan, shdChan, make(chan rpcclient.ClientConnector, 1), srvDep) + chrS := NewChargerService(cfg, db, chS, filterSChan, server, make(chan rpcclient.ClientConnector, 1), nil, anz, srvDep) + schS := NewSchedulerService(cfg, db, chS, filterSChan, server, make(chan rpcclient.ClientConnector, 1), nil, anz, srvDep) + ralS := NewRalService(cfg, chS, server, + make(chan rpcclient.ClientConnector, 1), + make(chan rpcclient.ClientConnector, 1), + shdChan, nil, anz, srvDep) + cdrsRPC := make(chan rpcclient.ClientConnector, 1) + cdrS := NewCDRServer(cfg, db, stordb, filterSChan, server, + cdrsRPC, nil, anz, srvDep) + srvMngr.AddServices(cdrS, ralS, schS, chrS, + NewLoaderService(cfg, db, filterSChan, server, + make(chan rpcclient.ClientConnector, 1), nil, anz, srvDep), db, stordb) + if err := srvMngr.StartServices(); err != nil { + t.Error(err) + } + stordb.db = nil + err = stordb.Reload() + if err == nil || err.Error() != "can't conver StorDB of type mysql to SQLStorage" { + t.Fatal(err) + } + + cfg.CdrsCfg().Enabled = false + cfg.GetReloadChan(config.CDRS_JSN) <- struct{}{} + time.Sleep(10 * time.Millisecond) + shdChan.CloseOnce() + time.Sleep(10 * time.Millisecond) +} + +func TestStorDBReloadVersion3(t *testing.T) { + cfg, err := config.NewCGRConfigFromPath(path.Join("/usr", "share", "cgrates", "conf", "samples", "tutinternal")) + if err != nil { + t.Fatal(err) + } + + storageDb, err := engine.NewStorDBConn(cfg.StorDbCfg().Type, + cfg.StorDbCfg().Host, cfg.StorDbCfg().Port, + cfg.StorDbCfg().Name, cfg.StorDbCfg().User, + cfg.StorDbCfg().Password, cfg.GeneralCfg().DBDataEncoding, + cfg.StorDbCfg().StringIndexedFields, cfg.StorDbCfg().PrefixIndexedFields, + cfg.StorDbCfg().Opts) + if err != nil { + t.Fatal(err) + } + + defer func() { + storageDb.Flush("") + storageDb.Close() + }() + + err = storageDb.SetVersions(engine.Versions{ + utils.CostDetails: 2, + utils.SessionSCosts: 3, + //old version for CDRs + utils.CDRs: 1, + utils.TpRatingPlans: 1, + utils.TpFilters: 1, + utils.TpDestinationRates: 1, + utils.TpActionTriggers: 1, + utils.TpAccountActionsV: 1, + utils.TpActionPlans: 1, + utils.TpActions: 1, + utils.TpThresholds: 1, + utils.TpRoutes: 1, + utils.TpStats: 1, + utils.TpSharedGroups: 1, + utils.TpRatingProfiles: 1, + utils.TpResources: 1, + utils.TpRates: 1, + utils.TpTiming: 1, + utils.TpResource: 1, + utils.TpDestinations: 1, + utils.TpRatingPlan: 1, + utils.TpRatingProfile: 1, + utils.TpChargers: 1, + utils.TpDispatchers: 1, + utils.TpRateProfiles: 1, + utils.TpActionProfiles: 1, + }, true) + + utils.Logger, _ = utils.Newlogger(utils.MetaSysLog, cfg.GeneralCfg().NodeID) + utils.Logger.SetLogLevel(7) + filterSChan := make(chan *engine.FilterS, 1) + filterSChan <- nil + shdChan := utils.NewSyncedChan() + shdWg := new(sync.WaitGroup) + chS := engine.NewCacheS(cfg, nil, nil) + cfg.ChargerSCfg().Enabled = true + server := cores.NewServer(nil) + srvMngr := servmanager.NewServiceManager(cfg, shdChan, shdWg) + srvDep := map[string]*sync.WaitGroup{utils.DataDB: new(sync.WaitGroup)} + db := NewDataDBService(cfg, nil, srvDep) + cfg.StorDbCfg().Password = "CGRateS.org" + stordb := NewStorDBService(cfg, srvDep) + stordb.oldDBCfg = cfg.StorDbCfg().Clone() + anz := NewAnalyzerService(cfg, server, filterSChan, shdChan, make(chan rpcclient.ClientConnector, 1), srvDep) + chrS := NewChargerService(cfg, db, chS, filterSChan, server, make(chan rpcclient.ClientConnector, 1), nil, anz, srvDep) + schS := NewSchedulerService(cfg, db, chS, filterSChan, server, make(chan rpcclient.ClientConnector, 1), nil, anz, srvDep) + ralS := NewRalService(cfg, chS, server, + make(chan rpcclient.ClientConnector, 1), + make(chan rpcclient.ClientConnector, 1), + shdChan, nil, anz, srvDep) + cdrsRPC := make(chan rpcclient.ClientConnector, 1) + cdrS := NewCDRServer(cfg, db, stordb, filterSChan, server, + cdrsRPC, nil, anz, srvDep) + srvMngr.AddServices(cdrS, ralS, schS, chrS, + NewLoaderService(cfg, db, filterSChan, server, + make(chan rpcclient.ClientConnector, 1), nil, anz, srvDep), db, stordb) + if err := srvMngr.StartServices(); err != nil { + t.Error(err) + } + stordb.db = nil + err = stordb.Reload() + if err == nil || err.Error() != "can't conver StorDB of type internal to InternalDB" { + t.Fatal(err) + } + + cfg.CdrsCfg().Enabled = false + cfg.GetReloadChan(config.CDRS_JSN) <- struct{}{} + time.Sleep(10 * time.Millisecond) + shdChan.CloseOnce() + time.Sleep(10 * time.Millisecond) +} diff --git a/services/thresholds.go b/services/thresholds.go index f4d4aa65a..5e35a8158 100644 --- a/services/thresholds.go +++ b/services/thresholds.go @@ -83,11 +83,8 @@ func (thrs *ThresholdService) Start() (err error) { thrs.Lock() defer thrs.Unlock() - thrs.thrs, err = engine.NewThresholdService(datadb, thrs.cfg, filterS) - if err != nil { - utils.Logger.Crit(fmt.Sprintf("<%s> Could not init, error: %s", utils.ThresholdS, err.Error())) - return - } + thrs.thrs = engine.NewThresholdService(datadb, thrs.cfg, filterS) + utils.Logger.Info(fmt.Sprintf("<%s> starting <%s> subsystem", utils.CoreS, utils.ThresholdS)) thrs.thrs.StartLoop() thrs.rpc = v1.NewThresholdSv1(thrs.thrs) diff --git a/services/thresholds_test.go b/services/thresholds_test.go index 15b5774ac..f270919dc 100644 --- a/services/thresholds_test.go +++ b/services/thresholds_test.go @@ -44,7 +44,7 @@ func TestThresholdSCoverage(t *testing.T) { if tS.IsRunning() { t.Errorf("Expected service to be down") } - thrs1, _ := engine.NewThresholdService(&engine.DataManager{}, &config.CGRConfig{}, &engine.FilterS{}) + thrs1 := engine.NewThresholdService(&engine.DataManager{}, &config.CGRConfig{}, &engine.FilterS{}) tS2 := &ThresholdService{ cfg: cfg, dm: db,