mirror of
https://github.com/cgrates/cgrates.git
synced 2026-02-11 18:16:24 +05:00
Updated stordb reload
This commit is contained in:
@@ -50,6 +50,8 @@ type ApierV1 struct {
|
||||
HTTPPoster *engine.HTTPPoster
|
||||
FilterS *engine.FilterS //Used for CDR Exporter
|
||||
ConnMgr *engine.ConnManager
|
||||
|
||||
StorDBChan chan engine.StorDB
|
||||
}
|
||||
|
||||
// Call implements rpcclient.ClientConnector interface for internal RPC
|
||||
@@ -1373,9 +1375,18 @@ func (apiv1 *ApierV1) GetRatingPlanIDs(args utils.TenantArgWithPaginator, attrPr
|
||||
return nil
|
||||
}
|
||||
|
||||
// SetStorDB sets the new connection for StorDB
|
||||
// only used on reload
|
||||
func (apiv1 *ApierV1) SetStorDB(storDB engine.StorDB) {
|
||||
apiv1.CdrDb = storDB
|
||||
apiv1.StorDb = storDB
|
||||
// ListenAndServe listen for storbd reload
|
||||
func (apiv1 *ApierV1) ListenAndServe(stopChan chan struct{}) (err error) {
|
||||
for {
|
||||
select {
|
||||
case <-stopChan:
|
||||
return
|
||||
case stordb, ok := <-apiv1.StorDBChan:
|
||||
if !ok { // the chanel was closed by the shutdown of stordbService
|
||||
return
|
||||
}
|
||||
apiv1.CdrDb = stordb
|
||||
apiv1.StorDb = stordb
|
||||
}
|
||||
}
|
||||
}
|
||||
|
||||
@@ -68,9 +68,9 @@ func fsCdrHandler(w http.ResponseWriter, r *http.Request) {
|
||||
}
|
||||
|
||||
// NewCDRServer is a constructor for CDRServer
|
||||
func NewCDRServer(cgrCfg *config.CGRConfig, cdrDb CdrStorage, dm *DataManager, filterS *FilterS,
|
||||
func NewCDRServer(cgrCfg *config.CGRConfig, storDBChan chan StorDB, dm *DataManager, filterS *FilterS,
|
||||
connMgr *ConnManager) *CDRServer {
|
||||
|
||||
cdrDb := <-storDBChan
|
||||
return &CDRServer{
|
||||
cgrCfg: cgrCfg,
|
||||
cdrDb: cdrDb,
|
||||
@@ -78,8 +78,9 @@ func NewCDRServer(cgrCfg *config.CGRConfig, cdrDb CdrStorage, dm *DataManager, f
|
||||
guard: guardian.Guardian,
|
||||
httpPoster: NewHTTPPoster(cgrCfg.GeneralCfg().HttpSkipTlsVerify,
|
||||
cgrCfg.GeneralCfg().ReplyTimeout),
|
||||
filterS: filterS,
|
||||
connMgr: connMgr,
|
||||
filterS: filterS,
|
||||
connMgr: connMgr,
|
||||
storDBChan: storDBChan,
|
||||
}
|
||||
}
|
||||
|
||||
@@ -92,6 +93,22 @@ type CDRServer struct {
|
||||
httpPoster *HTTPPoster // used for replication
|
||||
filterS *FilterS
|
||||
connMgr *ConnManager
|
||||
storDBChan chan StorDB
|
||||
}
|
||||
|
||||
// ListenAndServe listen for storbd reload
|
||||
func (cdrS *CDRServer) ListenAndServe(stopChan chan struct{}) (err error) {
|
||||
for {
|
||||
select {
|
||||
case <-stopChan:
|
||||
return
|
||||
case stordb, ok := <-cdrS.storDBChan:
|
||||
if !ok { // the chanel was closed by the shutdown of stordbService
|
||||
return
|
||||
}
|
||||
cdrS.cdrDb = stordb
|
||||
}
|
||||
}
|
||||
}
|
||||
|
||||
// RegisterHandlersToServer is called by cgr-engine to register HTTP URL handlers
|
||||
@@ -945,9 +962,3 @@ func (cdrS *CDRServer) V1CountCDRs(args *utils.RPCCDRsFilterWithArgDispatcher, c
|
||||
*cnt = qryCnt
|
||||
return nil
|
||||
}
|
||||
|
||||
// SetStorDB sets the new StorDB
|
||||
// only used on reload
|
||||
func (cdrS *CDRServer) SetStorDB(cdrDb CdrStorage) {
|
||||
cdrS.cdrDb = cdrDb
|
||||
}
|
||||
|
||||
@@ -21,6 +21,7 @@ package services
|
||||
import (
|
||||
"fmt"
|
||||
"sync"
|
||||
"time"
|
||||
|
||||
v1 "github.com/cgrates/cgrates/apier/v1"
|
||||
"github.com/cgrates/cgrates/config"
|
||||
@@ -65,118 +66,106 @@ type ApierV1Service struct {
|
||||
api *v1.ApierV1
|
||||
connChan chan rpcclient.ClientConnector
|
||||
|
||||
syncStop chan struct{}
|
||||
storDBChan chan engine.StorDB
|
||||
syncStop chan struct{}
|
||||
}
|
||||
|
||||
// Start should handle the sercive start
|
||||
// For this service the start should be called from RAL Service
|
||||
func (api *ApierV1Service) Start() (err error) {
|
||||
if api.IsRunning() {
|
||||
func (apiService *ApierV1Service) Start() (err error) {
|
||||
if apiService.IsRunning() {
|
||||
return fmt.Errorf("service aleady running")
|
||||
}
|
||||
|
||||
filterS := <-api.filterSChan
|
||||
api.filterSChan <- filterS
|
||||
dbchan := api.dm.GetDMChan()
|
||||
filterS := <-apiService.filterSChan
|
||||
apiService.filterSChan <- filterS
|
||||
dbchan := apiService.dm.GetDMChan()
|
||||
datadb := <-dbchan
|
||||
dbchan <- datadb
|
||||
|
||||
api.Lock()
|
||||
defer api.Unlock()
|
||||
apiService.Lock()
|
||||
defer apiService.Unlock()
|
||||
|
||||
api.storDBChan = make(chan engine.StorDB, 1)
|
||||
api.syncStop = make(chan struct{})
|
||||
api.storDB.RegisterSyncChan(api.storDBChan)
|
||||
stordb := <-api.storDBChan
|
||||
storDBChan := make(chan engine.StorDB, 1)
|
||||
apiService.syncStop = make(chan struct{})
|
||||
apiService.storDB.RegisterSyncChan(storDBChan)
|
||||
stordb := <-storDBChan
|
||||
|
||||
api.api = &v1.ApierV1{
|
||||
apiService.api = &v1.ApierV1{
|
||||
DataManager: datadb,
|
||||
CdrDb: stordb,
|
||||
StorDb: stordb,
|
||||
Config: api.cfg,
|
||||
Responder: api.responderService.GetResponder(),
|
||||
SchedulerService: api.schedService,
|
||||
HTTPPoster: engine.NewHTTPPoster(api.cfg.GeneralCfg().HttpSkipTlsVerify,
|
||||
api.cfg.GeneralCfg().ReplyTimeout),
|
||||
FilterS: filterS,
|
||||
ConnMgr: api.connMgr,
|
||||
Config: apiService.cfg,
|
||||
Responder: apiService.responderService.GetResponder(),
|
||||
SchedulerService: apiService.schedService,
|
||||
HTTPPoster: engine.NewHTTPPoster(apiService.cfg.GeneralCfg().HttpSkipTlsVerify,
|
||||
apiService.cfg.GeneralCfg().ReplyTimeout),
|
||||
FilterS: filterS,
|
||||
ConnMgr: apiService.connMgr,
|
||||
StorDBChan: storDBChan,
|
||||
}
|
||||
|
||||
if !api.cfg.DispatcherSCfg().Enabled {
|
||||
api.server.RpcRegister(api.api)
|
||||
api.server.RpcRegister(v1.NewReplicatorSv1(datadb))
|
||||
go func(api *v1.ApierV1, stopChan chan struct{}) {
|
||||
if err := api.ListenAndServe(stopChan); err != nil {
|
||||
utils.Logger.Err(fmt.Sprintf("<%s> error: <%s>", utils.CDRServer, err.Error()))
|
||||
// erS.exitChan <- true
|
||||
}
|
||||
}(apiService.api, apiService.syncStop)
|
||||
time.Sleep(1)
|
||||
|
||||
if !apiService.cfg.DispatcherSCfg().Enabled {
|
||||
apiService.server.RpcRegister(apiService.api)
|
||||
apiService.server.RpcRegister(v1.NewReplicatorSv1(datadb))
|
||||
}
|
||||
|
||||
utils.RegisterRpcParams("", &v1.CDRsV1{})
|
||||
utils.RegisterRpcParams("", &v1.SMGenericV1{})
|
||||
utils.RegisterRpcParams("", api.api)
|
||||
utils.RegisterRpcParams("", apiService.api)
|
||||
|
||||
api.connChan <- api.api
|
||||
go api.sync()
|
||||
apiService.connChan <- apiService.api
|
||||
|
||||
return
|
||||
}
|
||||
|
||||
// GetIntenternalChan returns the internal connection chanel
|
||||
func (api *ApierV1Service) GetIntenternalChan() (conn chan rpcclient.ClientConnector) {
|
||||
return api.connChan
|
||||
func (apiService *ApierV1Service) GetIntenternalChan() (conn chan rpcclient.ClientConnector) {
|
||||
return apiService.connChan
|
||||
}
|
||||
|
||||
// Reload handles the change of config
|
||||
func (api *ApierV1Service) Reload() (err error) {
|
||||
func (apiService *ApierV1Service) Reload() (err error) {
|
||||
return
|
||||
}
|
||||
|
||||
// Shutdown stops the service
|
||||
func (api *ApierV1Service) Shutdown() (err error) {
|
||||
api.Lock()
|
||||
close(api.syncStop)
|
||||
api.api = nil
|
||||
<-api.connChan
|
||||
api.Unlock()
|
||||
func (apiService *ApierV1Service) Shutdown() (err error) {
|
||||
apiService.Lock()
|
||||
close(apiService.syncStop)
|
||||
apiService.api = nil
|
||||
<-apiService.connChan
|
||||
apiService.Unlock()
|
||||
return
|
||||
}
|
||||
|
||||
// IsRunning returns if the service is running
|
||||
func (api *ApierV1Service) IsRunning() bool {
|
||||
api.RLock()
|
||||
defer api.RUnlock()
|
||||
return api != nil && api.api != nil
|
||||
func (apiService *ApierV1Service) IsRunning() bool {
|
||||
apiService.RLock()
|
||||
defer apiService.RUnlock()
|
||||
return apiService != nil && apiService.api != nil
|
||||
}
|
||||
|
||||
// ServiceName returns the service name
|
||||
func (api *ApierV1Service) ServiceName() string {
|
||||
func (apiService *ApierV1Service) ServiceName() string {
|
||||
return utils.ApierV1
|
||||
}
|
||||
|
||||
// GetApierV1 returns the apierV1
|
||||
func (api *ApierV1Service) GetApierV1() *v1.ApierV1 {
|
||||
api.RLock()
|
||||
defer api.RUnlock()
|
||||
return api.api
|
||||
func (apiService *ApierV1Service) GetApierV1() *v1.ApierV1 {
|
||||
apiService.RLock()
|
||||
defer apiService.RUnlock()
|
||||
return apiService.api
|
||||
}
|
||||
|
||||
// ShouldRun returns if the service should be running
|
||||
func (api *ApierV1Service) ShouldRun() bool {
|
||||
return api.cfg.RalsCfg().Enabled
|
||||
}
|
||||
|
||||
// sync handles stordb sync
|
||||
func (api *ApierV1Service) sync() {
|
||||
for {
|
||||
select {
|
||||
case <-api.syncStop:
|
||||
return
|
||||
case stordb, ok := <-api.storDBChan:
|
||||
if !ok { // the chanel was closed by the shutdown of stordbService
|
||||
return
|
||||
}
|
||||
api.Lock()
|
||||
if api.api != nil {
|
||||
api.api.SetStorDB(stordb)
|
||||
}
|
||||
api.Unlock()
|
||||
}
|
||||
}
|
||||
func (apiService *ApierV1Service) ShouldRun() bool {
|
||||
return apiService.cfg.RalsCfg().Enabled
|
||||
}
|
||||
|
||||
105
services/cdrs.go
105
services/cdrs.go
@@ -21,6 +21,7 @@ package services
|
||||
import (
|
||||
"fmt"
|
||||
"sync"
|
||||
"time"
|
||||
|
||||
v1 "github.com/cgrates/cgrates/apier/v1"
|
||||
v2 "github.com/cgrates/cgrates/apier/v2"
|
||||
@@ -62,101 +63,87 @@ type CDRServer struct {
|
||||
connChan chan rpcclient.ClientConnector
|
||||
connMgr *engine.ConnManager
|
||||
|
||||
syncStop chan struct{}
|
||||
storDBChan chan engine.StorDB
|
||||
syncStop chan struct{}
|
||||
// storDBChan chan engine.StorDB
|
||||
}
|
||||
|
||||
// Start should handle the sercive start
|
||||
func (cdrS *CDRServer) Start() (err error) {
|
||||
if cdrS.IsRunning() {
|
||||
func (cdrService *CDRServer) Start() (err error) {
|
||||
if cdrService.IsRunning() {
|
||||
return fmt.Errorf("service aleady running")
|
||||
}
|
||||
|
||||
utils.Logger.Info(fmt.Sprintf("<%s> starting <%s> subsystem", utils.CoreS, utils.CDRs))
|
||||
|
||||
filterS := <-cdrS.filterSChan
|
||||
cdrS.filterSChan <- filterS
|
||||
dbchan := cdrS.dm.GetDMChan()
|
||||
filterS := <-cdrService.filterSChan
|
||||
cdrService.filterSChan <- filterS
|
||||
dbchan := cdrService.dm.GetDMChan()
|
||||
datadb := <-dbchan
|
||||
dbchan <- datadb
|
||||
|
||||
cdrS.Lock()
|
||||
defer cdrS.Unlock()
|
||||
cdrService.Lock()
|
||||
defer cdrService.Unlock()
|
||||
|
||||
cdrS.storDBChan = make(chan engine.StorDB, 1)
|
||||
cdrS.syncStop = make(chan struct{})
|
||||
cdrS.storDB.RegisterSyncChan(cdrS.storDBChan)
|
||||
stordb := <-cdrS.storDBChan
|
||||
storDBChan := make(chan engine.StorDB, 1)
|
||||
cdrService.syncStop = make(chan struct{})
|
||||
cdrService.storDB.RegisterSyncChan(storDBChan)
|
||||
|
||||
cdrS.cdrS = engine.NewCDRServer(cdrS.cfg, stordb, datadb, filterS, cdrS.connMgr)
|
||||
cdrService.cdrS = engine.NewCDRServer(cdrService.cfg, storDBChan, datadb, filterS, cdrService.connMgr)
|
||||
go func(cdrS *engine.CDRServer, stopChan chan struct{}) {
|
||||
if err := cdrS.ListenAndServe(stopChan); err != nil {
|
||||
utils.Logger.Err(fmt.Sprintf("<%s> error: <%s>", utils.CDRServer, err.Error()))
|
||||
// erS.exitChan <- true
|
||||
}
|
||||
}(cdrService.cdrS, cdrService.syncStop)
|
||||
time.Sleep(1)
|
||||
utils.Logger.Info("Registering CDRS HTTP Handlers.")
|
||||
cdrS.cdrS.RegisterHandlersToServer(cdrS.server)
|
||||
cdrService.cdrS.RegisterHandlersToServer(cdrService.server)
|
||||
utils.Logger.Info("Registering CDRS RPC service.")
|
||||
cdrS.rpcv1 = v1.NewCDRsV1(cdrS.cdrS)
|
||||
cdrS.rpcv2 = &v2.CDRsV2{CDRsV1: *cdrS.rpcv1}
|
||||
cdrS.server.RpcRegister(cdrS.rpcv1)
|
||||
cdrS.server.RpcRegister(cdrS.rpcv2)
|
||||
cdrService.rpcv1 = v1.NewCDRsV1(cdrService.cdrS)
|
||||
cdrService.rpcv2 = &v2.CDRsV2{CDRsV1: *cdrService.rpcv1}
|
||||
cdrService.server.RpcRegister(cdrService.rpcv1)
|
||||
cdrService.server.RpcRegister(cdrService.rpcv2)
|
||||
// Make the cdr server available for internal communication
|
||||
cdrS.server.RpcRegister(cdrS.cdrS) // register CdrServer for internal usage (TODO: refactor this)
|
||||
cdrS.connChan <- cdrS.cdrS // Signal that cdrS is operational
|
||||
go cdrS.sync()
|
||||
cdrService.server.RpcRegister(cdrService.cdrS) // register CdrServer for internal usage (TODO: refactor this)
|
||||
cdrService.connChan <- cdrService.cdrS // Signal that cdrS is operational
|
||||
return
|
||||
}
|
||||
|
||||
// GetIntenternalChan returns the internal connection chanel
|
||||
func (cdrS *CDRServer) GetIntenternalChan() (conn chan rpcclient.ClientConnector) {
|
||||
return cdrS.connChan
|
||||
func (cdrService *CDRServer) GetIntenternalChan() (conn chan rpcclient.ClientConnector) {
|
||||
return cdrService.connChan
|
||||
}
|
||||
|
||||
// Reload handles the change of config
|
||||
func (cdrS *CDRServer) Reload() (err error) {
|
||||
func (cdrService *CDRServer) Reload() (err error) {
|
||||
return
|
||||
}
|
||||
|
||||
// Shutdown stops the service
|
||||
func (cdrS *CDRServer) Shutdown() (err error) {
|
||||
cdrS.Lock()
|
||||
close(cdrS.syncStop)
|
||||
cdrS.cdrS = nil
|
||||
cdrS.rpcv1 = nil
|
||||
cdrS.rpcv2 = nil
|
||||
<-cdrS.connChan
|
||||
cdrS.Unlock()
|
||||
func (cdrService *CDRServer) Shutdown() (err error) {
|
||||
cdrService.Lock()
|
||||
close(cdrService.syncStop)
|
||||
cdrService.cdrS = nil
|
||||
cdrService.rpcv1 = nil
|
||||
cdrService.rpcv2 = nil
|
||||
<-cdrService.connChan
|
||||
cdrService.Unlock()
|
||||
return
|
||||
}
|
||||
|
||||
// IsRunning returns if the service is running
|
||||
func (cdrS *CDRServer) IsRunning() bool {
|
||||
cdrS.RLock()
|
||||
defer cdrS.RUnlock()
|
||||
return cdrS != nil && cdrS.cdrS != nil
|
||||
func (cdrService *CDRServer) IsRunning() bool {
|
||||
cdrService.RLock()
|
||||
defer cdrService.RUnlock()
|
||||
return cdrService != nil && cdrService.cdrS != nil
|
||||
}
|
||||
|
||||
// ServiceName returns the service name
|
||||
func (cdrS *CDRServer) ServiceName() string {
|
||||
func (cdrService *CDRServer) ServiceName() string {
|
||||
return utils.CDRServer
|
||||
}
|
||||
|
||||
// ShouldRun returns if the service should be running
|
||||
func (cdrS *CDRServer) ShouldRun() bool {
|
||||
return cdrS.cfg.CdrsCfg().Enabled
|
||||
}
|
||||
|
||||
// sync handles stordb sync
|
||||
func (cdrS *CDRServer) sync() {
|
||||
for {
|
||||
select {
|
||||
case <-cdrS.syncStop:
|
||||
return
|
||||
case stordb, ok := <-cdrS.storDBChan:
|
||||
if !ok { // the chanel was closed by the shutdown of stordbService
|
||||
return
|
||||
}
|
||||
cdrS.Lock()
|
||||
if cdrS.cdrS != nil {
|
||||
cdrS.cdrS.SetStorDB(stordb)
|
||||
}
|
||||
cdrS.Unlock()
|
||||
}
|
||||
}
|
||||
func (cdrService *CDRServer) ShouldRun() bool {
|
||||
return cdrService.cfg.CdrsCfg().Enabled
|
||||
}
|
||||
|
||||
@@ -50,6 +50,8 @@ type DNSAgent struct {
|
||||
|
||||
dns *agents.DNSAgent
|
||||
connMgr *engine.ConnManager
|
||||
|
||||
oldListen string
|
||||
}
|
||||
|
||||
// Start should handle the sercive start
|
||||
@@ -63,7 +65,7 @@ func (dns *DNSAgent) Start() (err error) {
|
||||
|
||||
dns.Lock()
|
||||
defer dns.Unlock()
|
||||
|
||||
dns.oldListen = dns.cfg.DNSAgentCfg().Listen
|
||||
dns.dns, err = agents.NewDNSAgent(dns.cfg, filterS, dns.connMgr)
|
||||
if err != nil {
|
||||
utils.Logger.Err(fmt.Sprintf("<%s> error: <%s>", utils.DNSAgent, err.Error()))
|
||||
@@ -86,10 +88,14 @@ func (dns *DNSAgent) GetIntenternalChan() (conn chan rpcclient.ClientConnector)
|
||||
|
||||
// Reload handles the change of config
|
||||
func (dns *DNSAgent) Reload() (err error) {
|
||||
if dns.oldListen == dns.cfg.DNSAgentCfg().Listen {
|
||||
return
|
||||
}
|
||||
if err = dns.Shutdown(); err != nil {
|
||||
return
|
||||
}
|
||||
dns.Lock()
|
||||
dns.oldListen = dns.cfg.DNSAgentCfg().Listen
|
||||
defer dns.Unlock()
|
||||
if err = dns.dns.Reload(); err != nil {
|
||||
return
|
||||
|
||||
Reference in New Issue
Block a user