diff --git a/cmd/cgr-engine/cgr-engine.go b/cmd/cgr-engine/cgr-engine.go index 31af38dfe..e32883cee 100644 --- a/cmd/cgr-engine/cgr-engine.go +++ b/cmd/cgr-engine/cgr-engine.go @@ -521,7 +521,7 @@ func main() { // Start ServiceManager srvManager := servmanager.NewServiceManager(cfg, exitChan) - attrS := services.NewAttributeService(cfg, dm, cacheS, filterSChan, server) + attrS := services.NewAttributeService(cfg, dmService, cacheS, filterSChan, server) dspS := services.NewDispatcherService(cfg, dm, cacheS, filterSChan, server, attrS.GetIntenternalChan()) chrS := services.NewChargerService(cfg, dm, cacheS, filterSChan, server, attrS.GetIntenternalChan(), dspS.GetIntenternalChan()) diff --git a/config/config.go b/config/config.go index 4c80fd8cd..a7bee3da3 100755 --- a/config/config.go +++ b/config/config.go @@ -1516,6 +1516,7 @@ func (cfg *CGRConfig) reloadSection(section string) (err error) { } fallthrough case DATADB_JSN: + cfg.rldChans[DATADB_JSN] <- struct{}{} if !fall { break } @@ -1618,6 +1619,7 @@ func (cfg *CGRConfig) reloadSection(section string) (err error) { } fallthrough case ATTRIBUTE_JSN: + cfg.rldChans[DATADB_JSN] <- struct{}{} // reload datadb before cfg.rldChans[ATTRIBUTE_JSN] <- struct{}{} if !fall { break diff --git a/services/attributes.go b/services/attributes.go index 3fe69368e..0d123bfd4 100644 --- a/services/attributes.go +++ b/services/attributes.go @@ -31,7 +31,7 @@ import ( ) // NewAttributeService returns the Attribute Service -func NewAttributeService(cfg *config.CGRConfig, dm *engine.DataManager, +func NewAttributeService(cfg *config.CGRConfig, dm *DataDBService, cacheS *engine.CacheS, filterSChan chan *engine.FilterS, server *utils.Server) servmanager.Service { return &AttributeService{ @@ -48,7 +48,7 @@ func NewAttributeService(cfg *config.CGRConfig, dm *engine.DataManager, type AttributeService struct { sync.RWMutex cfg *config.CGRConfig - dm *engine.DataManager + dm *DataDBService cacheS *engine.CacheS filterSChan chan *engine.FilterS server *utils.Server @@ -72,7 +72,7 @@ func (attrS *AttributeService) Start() (err error) { attrS.Lock() defer attrS.Unlock() - attrS.attrS, err = engine.NewAttributeService(attrS.dm, filterS, attrS.cfg) + attrS.attrS, err = engine.NewAttributeService(attrS.dm.GetDM(), filterS, attrS.cfg) if err != nil { utils.Logger.Crit( fmt.Sprintf("<%s> Could not init, error: %s", diff --git a/services/attributes_it_test.go b/services/attributes_it_test.go index 39fdffd3f..d23899390 100644 --- a/services/attributes_it_test.go +++ b/services/attributes_it_test.go @@ -46,15 +46,19 @@ func TestAttributeSReload(t *testing.T) { close(chS.GetPrecacheChannel(utils.CacheAttributeFilterIndexes)) server := utils.NewServer() srvMngr := servmanager.NewServiceManager(cfg, engineShutdown) - attrS := NewAttributeService(cfg, nil, + db := NewDataDBService(cfg) + attrS := NewAttributeService(cfg, db, chS, filterSChan, server) - srvMngr.AddServices(attrS, NewLoaderService(cfg, nil, filterSChan, server, nil, nil, engineShutdown)) + srvMngr.AddServices(attrS, NewLoaderService(cfg, nil, filterSChan, server, nil, nil, engineShutdown), db) if err = srvMngr.StartServices(); err != nil { t.Error(err) } if attrS.IsRunning() { t.Errorf("Expected service to be down") } + if db.IsRunning() { + t.Errorf("Expected service to be down") + } var reply string if err := cfg.V1ReloadConfig(&config.ConfigReloadWithArgDispatcher{ Path: path.Join("/usr", "share", "cgrates", "conf", "samples", "tutmongo"), @@ -68,6 +72,9 @@ func TestAttributeSReload(t *testing.T) { if !attrS.IsRunning() { t.Errorf("Expected service to be running") } + if !db.IsRunning() { + t.Errorf("Expected service to be running") + } cfg.AttributeSCfg().Enabled = false cfg.GetReloadChan(config.ATTRIBUTE_JSN) <- struct{}{} time.Sleep(10 * time.Millisecond) diff --git a/services/chargers_it_test.go b/services/chargers_it_test.go index b03946796..405d1096a 100644 --- a/services/chargers_it_test.go +++ b/services/chargers_it_test.go @@ -48,9 +48,10 @@ func TestChargerSReload(t *testing.T) { filterSChan <- nil server := utils.NewServer() srvMngr := servmanager.NewServiceManager(cfg, engineShutdown) - attrS := NewAttributeService(cfg, nil, chS, filterSChan, server) + db := NewDataDBService(cfg) + attrS := NewAttributeService(cfg, db, chS, filterSChan, server) chrS := NewChargerService(cfg, nil, chS, filterSChan, server, attrS.GetIntenternalChan(), nil) - srvMngr.AddServices(attrS, chrS, NewLoaderService(cfg, nil, filterSChan, server, nil, nil, engineShutdown)) + srvMngr.AddServices(attrS, chrS, NewLoaderService(cfg, nil, filterSChan, server, nil, nil, engineShutdown), db) if err = srvMngr.StartServices(); err != nil { t.Error(err) } diff --git a/services/datadb_it_test.go b/services/datadb_it_test.go new file mode 100644 index 000000000..2b5e83cb2 --- /dev/null +++ b/services/datadb_it_test.go @@ -0,0 +1,91 @@ +// +build integration + +/* +Real-time Online/Offline Charging System (OCS) for Telecom & ISP environments +Copyright (C) ITsysCOM GmbH + +This program is free software: you can redistribute it and/or modify +it under the terms of the GNU General Public License as published by +the Free Software Foundation, either version 3 of the License, or +(at your option) any later version. + +This program is distributed in the hope that it will be useful, +but WITHOUT ANY WARRANTY; without even the implied warranty of +MERCHANTABILITY or FITNESS FOR A PARTICULAR PURPOSE. See the +GNU General Public License for more details. + +You should have received a copy of the GNU General Public License +along with this program. If not, see +*/ +package services + +import ( + "path" + "reflect" + "testing" + "time" + + "github.com/cgrates/cgrates/config" + "github.com/cgrates/cgrates/engine" + "github.com/cgrates/cgrates/servmanager" + "github.com/cgrates/cgrates/utils" +) + +func TestDataDBReload(t *testing.T) { + cfg, err := config.NewDefaultCGRConfig() + if err != nil { + t.Fatal(err) + } + utils.Newlogger(utils.MetaSysLog, cfg.GeneralCfg().NodeID) + utils.Logger.SetLogLevel(7) + + engineShutdown := make(chan bool, 1) + chS := engine.NewCacheS(cfg, nil) + filterSChan := make(chan *engine.FilterS, 1) + filterSChan <- nil + close(chS.GetPrecacheChannel(utils.CacheAttributeProfiles)) + close(chS.GetPrecacheChannel(utils.CacheAttributeFilterIndexes)) + server := utils.NewServer() + srvMngr := servmanager.NewServiceManager(cfg, engineShutdown) + db := NewDataDBService(cfg) + srvMngr.AddServices(NewAttributeService(cfg, db, + chS, filterSChan, server), NewLoaderService(cfg, nil, filterSChan, server, nil, nil, engineShutdown), db) + if err = srvMngr.StartServices(); err != nil { + t.Error(err) + } + if db.IsRunning() { + t.Errorf("Expected service to be down") + } + var reply string + cfg.AttributeSCfg().Enabled = true + if err := cfg.V1ReloadConfig(&config.ConfigReloadWithArgDispatcher{ + Path: path.Join("/usr", "share", "cgrates", "conf", "samples", "tutmongo"), + Section: config.DATADB_JSN, + }, &reply); err != nil { + t.Error(err) + } else if reply != utils.OK { + t.Errorf("Expecting OK ,received %s", reply) + } + time.Sleep(10 * time.Millisecond) //need to switch to gorutine + if !db.IsRunning() { + t.Errorf("Expected service to be running") + } + oldcfg := &config.DataDbCfg{ + DataDbType: utils.MONGO, + DataDbHost: "127.0.0.1", + DataDbPort: "27017", + DataDbName: "10", + DataDbUser: "cgrates", + QueryTimeout: 10 * time.Second, + } + if !reflect.DeepEqual(oldcfg, db.oldDBCfg) { + t.Errorf("Expected %s received:%s", utils.ToJSON(oldcfg), utils.ToJSON(db.oldDBCfg)) + } + cfg.AttributeSCfg().Enabled = false + cfg.GetReloadChan(config.DATADB_JSN) <- struct{}{} + time.Sleep(10 * time.Millisecond) + if db.IsRunning() { + t.Errorf("Expected service to be down") + } + engineShutdown <- true +} diff --git a/services/dispatchers_it_test.go b/services/dispatchers_it_test.go index 3826508f0..c6e684a0a 100644 --- a/services/dispatchers_it_test.go +++ b/services/dispatchers_it_test.go @@ -49,9 +49,10 @@ func TestDispatcherSReload(t *testing.T) { filterSChan <- nil server := utils.NewServer() srvMngr := servmanager.NewServiceManager(cfg, engineShutdown) - attrS := NewAttributeService(cfg, nil, chS, filterSChan, server) + db := NewDataDBService(cfg) + attrS := NewAttributeService(cfg, db, chS, filterSChan, server) srv := NewDispatcherService(cfg, nil, chS, filterSChan, server, attrS.GetIntenternalChan()) - srvMngr.AddServices(attrS, srv, NewLoaderService(cfg, nil, filterSChan, server, nil, nil, engineShutdown)) + srvMngr.AddServices(attrS, srv, NewLoaderService(cfg, nil, filterSChan, server, nil, nil, engineShutdown), db) if err = srvMngr.StartServices(); err != nil { t.Error(err) } diff --git a/servmanager/servmanager.go b/servmanager/servmanager.go index 11d6e8c75..ab226ddcc 100644 --- a/servmanager/servmanager.go +++ b/servmanager/servmanager.go @@ -293,6 +293,10 @@ func (srvMngr *ServiceManager) handleReload() { if err = srvMngr.reloadService(utils.DispatcherS); err != nil { return } + case <-srvMngr.GetConfig().GetReloadChan(config.DATADB_JSN): + if err = srvMngr.reloadService(utils.DataDB); err != nil { + return + } } // handle RPC server }