From 548f895a87bcf0c7fb222b5fa4ad665e65dfb24a Mon Sep 17 00:00:00 2001 From: Trial97 Date: Thu, 9 Jan 2020 12:09:02 +0200 Subject: [PATCH] Updated stordb reload --- apier/v1/apier.go | 21 ++++++-- engine/cdrs.go | 31 +++++++---- services/apierv1.go | 121 ++++++++++++++++++++----------------------- services/cdrs.go | 105 ++++++++++++++++--------------------- services/dnsagent.go | 8 ++- 5 files changed, 145 insertions(+), 141 deletions(-) diff --git a/apier/v1/apier.go b/apier/v1/apier.go index 3d39afa18..b4bde4038 100644 --- a/apier/v1/apier.go +++ b/apier/v1/apier.go @@ -50,6 +50,8 @@ type ApierV1 struct { HTTPPoster *engine.HTTPPoster FilterS *engine.FilterS //Used for CDR Exporter ConnMgr *engine.ConnManager + + StorDBChan chan engine.StorDB } // Call implements rpcclient.ClientConnector interface for internal RPC @@ -1373,9 +1375,18 @@ func (apiv1 *ApierV1) GetRatingPlanIDs(args utils.TenantArgWithPaginator, attrPr return nil } -// SetStorDB sets the new connection for StorDB -// only used on reload -func (apiv1 *ApierV1) SetStorDB(storDB engine.StorDB) { - apiv1.CdrDb = storDB - apiv1.StorDb = storDB +// ListenAndServe listen for storbd reload +func (apiv1 *ApierV1) ListenAndServe(stopChan chan struct{}) (err error) { + for { + select { + case <-stopChan: + return + case stordb, ok := <-apiv1.StorDBChan: + if !ok { // the chanel was closed by the shutdown of stordbService + return + } + apiv1.CdrDb = stordb + apiv1.StorDb = stordb + } + } } diff --git a/engine/cdrs.go b/engine/cdrs.go index 832c0d425..0d263bac3 100644 --- a/engine/cdrs.go +++ b/engine/cdrs.go @@ -68,9 +68,9 @@ func fsCdrHandler(w http.ResponseWriter, r *http.Request) { } // NewCDRServer is a constructor for CDRServer -func NewCDRServer(cgrCfg *config.CGRConfig, cdrDb CdrStorage, dm *DataManager, filterS *FilterS, +func NewCDRServer(cgrCfg *config.CGRConfig, storDBChan chan StorDB, dm *DataManager, filterS *FilterS, connMgr *ConnManager) *CDRServer { - + cdrDb := <-storDBChan return &CDRServer{ cgrCfg: cgrCfg, cdrDb: cdrDb, @@ -78,8 +78,9 @@ func NewCDRServer(cgrCfg *config.CGRConfig, cdrDb CdrStorage, dm *DataManager, f guard: guardian.Guardian, httpPoster: NewHTTPPoster(cgrCfg.GeneralCfg().HttpSkipTlsVerify, cgrCfg.GeneralCfg().ReplyTimeout), - filterS: filterS, - connMgr: connMgr, + filterS: filterS, + connMgr: connMgr, + storDBChan: storDBChan, } } @@ -92,6 +93,22 @@ type CDRServer struct { httpPoster *HTTPPoster // used for replication filterS *FilterS connMgr *ConnManager + storDBChan chan StorDB +} + +// ListenAndServe listen for storbd reload +func (cdrS *CDRServer) ListenAndServe(stopChan chan struct{}) (err error) { + for { + select { + case <-stopChan: + return + case stordb, ok := <-cdrS.storDBChan: + if !ok { // the chanel was closed by the shutdown of stordbService + return + } + cdrS.cdrDb = stordb + } + } } // RegisterHandlersToServer is called by cgr-engine to register HTTP URL handlers @@ -945,9 +962,3 @@ func (cdrS *CDRServer) V1CountCDRs(args *utils.RPCCDRsFilterWithArgDispatcher, c *cnt = qryCnt return nil } - -// SetStorDB sets the new StorDB -// only used on reload -func (cdrS *CDRServer) SetStorDB(cdrDb CdrStorage) { - cdrS.cdrDb = cdrDb -} diff --git a/services/apierv1.go b/services/apierv1.go index cfb348806..afdf32a9c 100644 --- a/services/apierv1.go +++ b/services/apierv1.go @@ -21,6 +21,7 @@ package services import ( "fmt" "sync" + "time" v1 "github.com/cgrates/cgrates/apier/v1" "github.com/cgrates/cgrates/config" @@ -65,118 +66,106 @@ type ApierV1Service struct { api *v1.ApierV1 connChan chan rpcclient.ClientConnector - syncStop chan struct{} - storDBChan chan engine.StorDB + syncStop chan struct{} } // Start should handle the sercive start // For this service the start should be called from RAL Service -func (api *ApierV1Service) Start() (err error) { - if api.IsRunning() { +func (apiService *ApierV1Service) Start() (err error) { + if apiService.IsRunning() { return fmt.Errorf("service aleady running") } - filterS := <-api.filterSChan - api.filterSChan <- filterS - dbchan := api.dm.GetDMChan() + filterS := <-apiService.filterSChan + apiService.filterSChan <- filterS + dbchan := apiService.dm.GetDMChan() datadb := <-dbchan dbchan <- datadb - api.Lock() - defer api.Unlock() + apiService.Lock() + defer apiService.Unlock() - api.storDBChan = make(chan engine.StorDB, 1) - api.syncStop = make(chan struct{}) - api.storDB.RegisterSyncChan(api.storDBChan) - stordb := <-api.storDBChan + storDBChan := make(chan engine.StorDB, 1) + apiService.syncStop = make(chan struct{}) + apiService.storDB.RegisterSyncChan(storDBChan) + stordb := <-storDBChan - api.api = &v1.ApierV1{ + apiService.api = &v1.ApierV1{ DataManager: datadb, CdrDb: stordb, StorDb: stordb, - Config: api.cfg, - Responder: api.responderService.GetResponder(), - SchedulerService: api.schedService, - HTTPPoster: engine.NewHTTPPoster(api.cfg.GeneralCfg().HttpSkipTlsVerify, - api.cfg.GeneralCfg().ReplyTimeout), - FilterS: filterS, - ConnMgr: api.connMgr, + Config: apiService.cfg, + Responder: apiService.responderService.GetResponder(), + SchedulerService: apiService.schedService, + HTTPPoster: engine.NewHTTPPoster(apiService.cfg.GeneralCfg().HttpSkipTlsVerify, + apiService.cfg.GeneralCfg().ReplyTimeout), + FilterS: filterS, + ConnMgr: apiService.connMgr, + StorDBChan: storDBChan, } - if !api.cfg.DispatcherSCfg().Enabled { - api.server.RpcRegister(api.api) - api.server.RpcRegister(v1.NewReplicatorSv1(datadb)) + go func(api *v1.ApierV1, stopChan chan struct{}) { + if err := api.ListenAndServe(stopChan); err != nil { + utils.Logger.Err(fmt.Sprintf("<%s> error: <%s>", utils.CDRServer, err.Error())) + // erS.exitChan <- true + } + }(apiService.api, apiService.syncStop) + time.Sleep(1) + + if !apiService.cfg.DispatcherSCfg().Enabled { + apiService.server.RpcRegister(apiService.api) + apiService.server.RpcRegister(v1.NewReplicatorSv1(datadb)) } utils.RegisterRpcParams("", &v1.CDRsV1{}) utils.RegisterRpcParams("", &v1.SMGenericV1{}) - utils.RegisterRpcParams("", api.api) + utils.RegisterRpcParams("", apiService.api) - api.connChan <- api.api - go api.sync() + apiService.connChan <- apiService.api return } // GetIntenternalChan returns the internal connection chanel -func (api *ApierV1Service) GetIntenternalChan() (conn chan rpcclient.ClientConnector) { - return api.connChan +func (apiService *ApierV1Service) GetIntenternalChan() (conn chan rpcclient.ClientConnector) { + return apiService.connChan } // Reload handles the change of config -func (api *ApierV1Service) Reload() (err error) { +func (apiService *ApierV1Service) Reload() (err error) { return } // Shutdown stops the service -func (api *ApierV1Service) Shutdown() (err error) { - api.Lock() - close(api.syncStop) - api.api = nil - <-api.connChan - api.Unlock() +func (apiService *ApierV1Service) Shutdown() (err error) { + apiService.Lock() + close(apiService.syncStop) + apiService.api = nil + <-apiService.connChan + apiService.Unlock() return } // IsRunning returns if the service is running -func (api *ApierV1Service) IsRunning() bool { - api.RLock() - defer api.RUnlock() - return api != nil && api.api != nil +func (apiService *ApierV1Service) IsRunning() bool { + apiService.RLock() + defer apiService.RUnlock() + return apiService != nil && apiService.api != nil } // ServiceName returns the service name -func (api *ApierV1Service) ServiceName() string { +func (apiService *ApierV1Service) ServiceName() string { return utils.ApierV1 } // GetApierV1 returns the apierV1 -func (api *ApierV1Service) GetApierV1() *v1.ApierV1 { - api.RLock() - defer api.RUnlock() - return api.api +func (apiService *ApierV1Service) GetApierV1() *v1.ApierV1 { + apiService.RLock() + defer apiService.RUnlock() + return apiService.api } // ShouldRun returns if the service should be running -func (api *ApierV1Service) ShouldRun() bool { - return api.cfg.RalsCfg().Enabled -} - -// sync handles stordb sync -func (api *ApierV1Service) sync() { - for { - select { - case <-api.syncStop: - return - case stordb, ok := <-api.storDBChan: - if !ok { // the chanel was closed by the shutdown of stordbService - return - } - api.Lock() - if api.api != nil { - api.api.SetStorDB(stordb) - } - api.Unlock() - } - } +func (apiService *ApierV1Service) ShouldRun() bool { + return apiService.cfg.RalsCfg().Enabled } diff --git a/services/cdrs.go b/services/cdrs.go index dee2c4bd7..c9336d753 100644 --- a/services/cdrs.go +++ b/services/cdrs.go @@ -21,6 +21,7 @@ package services import ( "fmt" "sync" + "time" v1 "github.com/cgrates/cgrates/apier/v1" v2 "github.com/cgrates/cgrates/apier/v2" @@ -62,101 +63,87 @@ type CDRServer struct { connChan chan rpcclient.ClientConnector connMgr *engine.ConnManager - syncStop chan struct{} - storDBChan chan engine.StorDB + syncStop chan struct{} + // storDBChan chan engine.StorDB } // Start should handle the sercive start -func (cdrS *CDRServer) Start() (err error) { - if cdrS.IsRunning() { +func (cdrService *CDRServer) Start() (err error) { + if cdrService.IsRunning() { return fmt.Errorf("service aleady running") } utils.Logger.Info(fmt.Sprintf("<%s> starting <%s> subsystem", utils.CoreS, utils.CDRs)) - filterS := <-cdrS.filterSChan - cdrS.filterSChan <- filterS - dbchan := cdrS.dm.GetDMChan() + filterS := <-cdrService.filterSChan + cdrService.filterSChan <- filterS + dbchan := cdrService.dm.GetDMChan() datadb := <-dbchan dbchan <- datadb - cdrS.Lock() - defer cdrS.Unlock() + cdrService.Lock() + defer cdrService.Unlock() - cdrS.storDBChan = make(chan engine.StorDB, 1) - cdrS.syncStop = make(chan struct{}) - cdrS.storDB.RegisterSyncChan(cdrS.storDBChan) - stordb := <-cdrS.storDBChan + storDBChan := make(chan engine.StorDB, 1) + cdrService.syncStop = make(chan struct{}) + cdrService.storDB.RegisterSyncChan(storDBChan) - cdrS.cdrS = engine.NewCDRServer(cdrS.cfg, stordb, datadb, filterS, cdrS.connMgr) + cdrService.cdrS = engine.NewCDRServer(cdrService.cfg, storDBChan, datadb, filterS, cdrService.connMgr) + go func(cdrS *engine.CDRServer, stopChan chan struct{}) { + if err := cdrS.ListenAndServe(stopChan); err != nil { + utils.Logger.Err(fmt.Sprintf("<%s> error: <%s>", utils.CDRServer, err.Error())) + // erS.exitChan <- true + } + }(cdrService.cdrS, cdrService.syncStop) + time.Sleep(1) utils.Logger.Info("Registering CDRS HTTP Handlers.") - cdrS.cdrS.RegisterHandlersToServer(cdrS.server) + cdrService.cdrS.RegisterHandlersToServer(cdrService.server) utils.Logger.Info("Registering CDRS RPC service.") - cdrS.rpcv1 = v1.NewCDRsV1(cdrS.cdrS) - cdrS.rpcv2 = &v2.CDRsV2{CDRsV1: *cdrS.rpcv1} - cdrS.server.RpcRegister(cdrS.rpcv1) - cdrS.server.RpcRegister(cdrS.rpcv2) + cdrService.rpcv1 = v1.NewCDRsV1(cdrService.cdrS) + cdrService.rpcv2 = &v2.CDRsV2{CDRsV1: *cdrService.rpcv1} + cdrService.server.RpcRegister(cdrService.rpcv1) + cdrService.server.RpcRegister(cdrService.rpcv2) // Make the cdr server available for internal communication - cdrS.server.RpcRegister(cdrS.cdrS) // register CdrServer for internal usage (TODO: refactor this) - cdrS.connChan <- cdrS.cdrS // Signal that cdrS is operational - go cdrS.sync() + cdrService.server.RpcRegister(cdrService.cdrS) // register CdrServer for internal usage (TODO: refactor this) + cdrService.connChan <- cdrService.cdrS // Signal that cdrS is operational return } // GetIntenternalChan returns the internal connection chanel -func (cdrS *CDRServer) GetIntenternalChan() (conn chan rpcclient.ClientConnector) { - return cdrS.connChan +func (cdrService *CDRServer) GetIntenternalChan() (conn chan rpcclient.ClientConnector) { + return cdrService.connChan } // Reload handles the change of config -func (cdrS *CDRServer) Reload() (err error) { +func (cdrService *CDRServer) Reload() (err error) { return } // Shutdown stops the service -func (cdrS *CDRServer) Shutdown() (err error) { - cdrS.Lock() - close(cdrS.syncStop) - cdrS.cdrS = nil - cdrS.rpcv1 = nil - cdrS.rpcv2 = nil - <-cdrS.connChan - cdrS.Unlock() +func (cdrService *CDRServer) Shutdown() (err error) { + cdrService.Lock() + close(cdrService.syncStop) + cdrService.cdrS = nil + cdrService.rpcv1 = nil + cdrService.rpcv2 = nil + <-cdrService.connChan + cdrService.Unlock() return } // IsRunning returns if the service is running -func (cdrS *CDRServer) IsRunning() bool { - cdrS.RLock() - defer cdrS.RUnlock() - return cdrS != nil && cdrS.cdrS != nil +func (cdrService *CDRServer) IsRunning() bool { + cdrService.RLock() + defer cdrService.RUnlock() + return cdrService != nil && cdrService.cdrS != nil } // ServiceName returns the service name -func (cdrS *CDRServer) ServiceName() string { +func (cdrService *CDRServer) ServiceName() string { return utils.CDRServer } // ShouldRun returns if the service should be running -func (cdrS *CDRServer) ShouldRun() bool { - return cdrS.cfg.CdrsCfg().Enabled -} - -// sync handles stordb sync -func (cdrS *CDRServer) sync() { - for { - select { - case <-cdrS.syncStop: - return - case stordb, ok := <-cdrS.storDBChan: - if !ok { // the chanel was closed by the shutdown of stordbService - return - } - cdrS.Lock() - if cdrS.cdrS != nil { - cdrS.cdrS.SetStorDB(stordb) - } - cdrS.Unlock() - } - } +func (cdrService *CDRServer) ShouldRun() bool { + return cdrService.cfg.CdrsCfg().Enabled } diff --git a/services/dnsagent.go b/services/dnsagent.go index db4dc1026..485cac71b 100644 --- a/services/dnsagent.go +++ b/services/dnsagent.go @@ -50,6 +50,8 @@ type DNSAgent struct { dns *agents.DNSAgent connMgr *engine.ConnManager + + oldListen string } // Start should handle the sercive start @@ -63,7 +65,7 @@ func (dns *DNSAgent) Start() (err error) { dns.Lock() defer dns.Unlock() - + dns.oldListen = dns.cfg.DNSAgentCfg().Listen dns.dns, err = agents.NewDNSAgent(dns.cfg, filterS, dns.connMgr) if err != nil { utils.Logger.Err(fmt.Sprintf("<%s> error: <%s>", utils.DNSAgent, err.Error())) @@ -86,10 +88,14 @@ func (dns *DNSAgent) GetIntenternalChan() (conn chan rpcclient.ClientConnector) // Reload handles the change of config func (dns *DNSAgent) Reload() (err error) { + if dns.oldListen == dns.cfg.DNSAgentCfg().Listen { + return + } if err = dns.Shutdown(); err != nil { return } dns.Lock() + dns.oldListen = dns.cfg.DNSAgentCfg().Listen defer dns.Unlock() if err = dns.dns.Reload(); err != nil { return