Added Locks for services

This commit is contained in:
Trial97
2019-09-19 17:06:43 +03:00
committed by Dan Christian Bogos
parent 1cffc4ca90
commit 0fd0813b24
7 changed files with 65 additions and 11 deletions

View File

@@ -20,6 +20,7 @@ package services
import (
"fmt"
"sync"
v1 "github.com/cgrates/cgrates/apier/v1"
"github.com/cgrates/cgrates/engine"
@@ -37,6 +38,7 @@ func NewAttributeService() servmanager.Service {
// AttributeService implements Service interface
type AttributeService struct {
sync.RWMutex
attrS *engine.AttributeService
rpc *v1.AttributeSv1
connChan chan rpcclient.RpcClientConnection
@@ -53,6 +55,8 @@ func (attrS *AttributeService) Start(sp servmanager.ServiceProvider, waitCache b
<-sp.GetCacheS().GetPrecacheChannel(utils.CacheAttributeFilterIndexes)
}
attrS.Lock()
defer attrS.Unlock()
attrS.attrS, err = engine.NewAttributeService(sp.GetDM(),
sp.GetFilterS(), sp.GetConfig())
if err != nil {
@@ -62,12 +66,11 @@ func (attrS *AttributeService) Start(sp servmanager.ServiceProvider, waitCache b
return
}
utils.Logger.Info(fmt.Sprintf("<%s> starting <%s> subsystem", utils.ServiceManager, utils.AttributeS))
aSv1 := v1.NewAttributeSv1(attrS.attrS)
attrS.rpc = v1.NewAttributeSv1(attrS.attrS)
if !sp.GetConfig().DispatcherSCfg().Enabled {
sp.GetServer().RpcRegister(aSv1)
sp.GetServer().RpcRegister(attrS.rpc)
}
attrS.connChan <- aSv1
attrS.rpc = aSv1
attrS.connChan <- attrS.rpc
return
}
@@ -83,6 +86,8 @@ func (attrS *AttributeService) Reload(sp servmanager.ServiceProvider) (err error
// Shutdown stops the service
func (attrS *AttributeService) Shutdown() (err error) {
attrS.Lock()
defer attrS.Unlock()
if err = attrS.attrS.Shutdown(); err != nil {
return
}
@@ -99,6 +104,8 @@ func (attrS *AttributeService) GetRPCInterface() interface{} {
// IsRunning returns if the service is running
func (attrS *AttributeService) IsRunning() bool {
attrS.RLock()
defer attrS.RUnlock()
return attrS != nil && attrS.attrS != nil
}

View File

@@ -20,6 +20,7 @@ package services
import (
"fmt"
"sync"
v1 "github.com/cgrates/cgrates/apier/v1"
"github.com/cgrates/cgrates/engine"
@@ -37,6 +38,7 @@ func NewChargerService() servmanager.Service {
// ChargerService implements Service interface
type ChargerService struct {
sync.RWMutex
chrS *engine.ChargerService
rpc *v1.ChargerSv1
connChan chan rpcclient.RpcClientConnection
@@ -57,7 +59,8 @@ func (chrS *ChargerService) Start(sp servmanager.ServiceProvider, waitCache bool
utils.ChargerS, utils.AttributeS, err.Error()))
return
}
chrS.Lock()
defer chrS.Unlock()
if chrS.chrS, err = engine.NewChargerService(sp.GetDM(), sp.GetFilterS(), chrSConn, sp.GetConfig()); err != nil {
utils.Logger.Crit(
fmt.Sprintf("<%s> Could not init, error: %s",
@@ -86,6 +89,8 @@ func (chrS *ChargerService) Reload(sp servmanager.ServiceProvider) (err error) {
// Shutdown stops the service
func (chrS *ChargerService) Shutdown() (err error) {
chrS.Lock()
defer chrS.Unlock()
if err = chrS.chrS.Shutdown(); err != nil {
return
}
@@ -102,6 +107,8 @@ func (chrS *ChargerService) GetRPCInterface() interface{} {
// IsRunning returns if the service is running
func (chrS *ChargerService) IsRunning() bool {
chrS.RLock()
defer chrS.RUnlock()
return chrS != nil && chrS.chrS != nil
}

View File

@@ -20,6 +20,7 @@ package services
import (
"fmt"
"sync"
v1 "github.com/cgrates/cgrates/apier/v1"
"github.com/cgrates/cgrates/engine"
@@ -37,6 +38,7 @@ func NewResourceService() servmanager.Service {
// ResourceService implements Service interface
type ResourceService struct {
sync.RWMutex
reS *engine.ResourceService
rpc *v1.ResourceSv1
connChan chan rpcclient.RpcClientConnection
@@ -58,6 +60,8 @@ func (reS *ResourceService) Start(sp servmanager.ServiceProvider, waitCache bool
return
}
reS.Lock()
defer reS.Unlock()
reS.reS, err = engine.NewResourceService(sp.GetDM(), sp.GetConfig(), thdSConn, sp.GetFilterS())
if err != nil {
utils.Logger.Crit(fmt.Sprintf("<%s> Could not init, error: %s", utils.ResourceS, err.Error()))
@@ -80,12 +84,16 @@ func (reS *ResourceService) GetIntenternalChan() (conn chan rpcclient.RpcClientC
// Reload handles the change of config
func (reS *ResourceService) Reload(sp servmanager.ServiceProvider) (err error) {
reS.Lock()
reS.reS.Reload()
defer reS.Unlock()
return
}
// Shutdown stops the service
func (reS *ResourceService) Shutdown() (err error) {
reS.Lock()
defer reS.Unlock()
if err = reS.reS.Shutdown(); err != nil {
return
}
@@ -102,6 +110,8 @@ func (reS *ResourceService) GetRPCInterface() interface{} {
// IsRunning returns if the service is running
func (reS *ResourceService) IsRunning() bool {
reS.RLock()
defer reS.RUnlock()
return reS != nil && reS.reS != nil
}

View File

@@ -51,10 +51,13 @@ func (schS *SchedulerService) Start(sp servmanager.ServiceProvider, waitCache bo
return fmt.Errorf("service aleady running")
}
schS.Lock()
if waitCache { // Wait for cache to load data before starting
<-sp.GetCacheS().GetPrecacheChannel(utils.CacheActionPlans) // wait for ActionPlans to be cached
}
schS.Lock()
defer schS.Unlock()
utils.Logger.Info("<ServiceManager> Starting CGRateS Scheduler.")
schS.schS = scheduler.NewScheduler(sp.GetDM())
go schS.schS.Loop()
@@ -63,7 +66,6 @@ func (schS *SchedulerService) Start(sp servmanager.ServiceProvider, waitCache bo
if !sp.GetConfig().DispatcherSCfg().Enabled {
sp.GetServer().RpcRegister(schS.rpc)
}
schS.Unlock()
schS.connChan <- schS.rpc
// Create connection to CDR Server and share it in engine(used for *cdrlog action)
@@ -86,20 +88,20 @@ func (schS *SchedulerService) GetIntenternalChan() (conn chan rpcclient.RpcClien
// Reload handles the change of config
func (schS *SchedulerService) Reload(sp servmanager.ServiceProvider) (err error) {
schS.RLock()
schS.Lock()
schS.schS.Reload()
defer schS.RUnlock()
defer schS.Unlock()
return
}
// Shutdown stops the service
func (schS *SchedulerService) Shutdown() (err error) {
schS.schS.Shutdown()
schS.Lock()
schS.schS.Shutdown()
schS.schS = nil
schS.rpc = nil
schS.Unlock()
<-schS.connChan
schS.Unlock()
return
}

View File

@@ -20,6 +20,7 @@ package services
import (
"fmt"
"sync"
v1 "github.com/cgrates/cgrates/apier/v1"
"github.com/cgrates/cgrates/engine"
@@ -37,6 +38,7 @@ func NewStatService() servmanager.Service {
// StatService implements Service interface
type StatService struct {
sync.RWMutex
sts *engine.StatService
rpc *v1.StatSv1
connChan chan rpcclient.RpcClientConnection
@@ -58,6 +60,8 @@ func (sts *StatService) Start(sp servmanager.ServiceProvider, waitCache bool) (e
utils.Logger.Crit(fmt.Sprintf("<%s> Could not connect to ThresholdS: %s", utils.StatS, err.Error()))
return
}
sts.Lock()
defer sts.Unlock()
sts.sts, err = engine.NewStatService(sp.GetDM(), sp.GetConfig(), thdSConn, sp.GetFilterS())
if err != nil {
utils.Logger.Crit(fmt.Sprintf("<StatS> Could not init, error: %s", err.Error()))
@@ -80,12 +84,16 @@ func (sts *StatService) GetIntenternalChan() (conn chan rpcclient.RpcClientConne
// Reload handles the change of config
func (sts *StatService) Reload(sp servmanager.ServiceProvider) (err error) {
sts.Lock()
sts.sts.Reload()
sts.Unlock()
return
}
// Shutdown stops the service
func (sts *StatService) Shutdown() (err error) {
sts.Lock()
defer sts.Unlock()
if err = sts.sts.Shutdown(); err != nil {
return
}
@@ -102,6 +110,8 @@ func (sts *StatService) GetRPCInterface() interface{} {
// IsRunning returns if the service is running
func (sts *StatService) IsRunning() bool {
sts.RLock()
defer sts.RUnlock()
return sts != nil && sts.sts != nil
}

View File

@@ -20,6 +20,7 @@ package services
import (
"fmt"
"sync"
v1 "github.com/cgrates/cgrates/apier/v1"
"github.com/cgrates/cgrates/engine"
@@ -37,6 +38,7 @@ func NewSupplierService() servmanager.Service {
// SupplierService implements Service interface
type SupplierService struct {
sync.RWMutex
splS *engine.SupplierService
rpc *v1.SupplierSv1
connChan chan rpcclient.RpcClientConnection
@@ -79,6 +81,8 @@ func (splS *SupplierService) Start(sp servmanager.ServiceProvider, waitCache boo
utils.SupplierS, err.Error()))
return
}
splS.Lock()
defer splS.Unlock()
utils.Logger.Info(fmt.Sprintf("<%s> starting <%s> subsystem", utils.CoreS, utils.SupplierS))
splS.rpc = v1.NewSupplierSv1(splS.splS)
if !sp.GetConfig().DispatcherSCfg().Enabled {
@@ -100,6 +104,8 @@ func (splS *SupplierService) Reload(sp servmanager.ServiceProvider) (err error)
// Shutdown stops the service
func (splS *SupplierService) Shutdown() (err error) {
splS.Lock()
defer splS.Unlock()
if err = splS.splS.Shutdown(); err != nil {
return
}
@@ -116,6 +122,8 @@ func (splS *SupplierService) GetRPCInterface() interface{} {
// IsRunning returns if the service is running
func (splS *SupplierService) IsRunning() bool {
splS.RLock()
defer splS.RUnlock()
return splS != nil && splS.splS != nil
}

View File

@@ -20,6 +20,7 @@ package services
import (
"fmt"
"sync"
v1 "github.com/cgrates/cgrates/apier/v1"
"github.com/cgrates/cgrates/engine"
@@ -37,6 +38,7 @@ func NewThresholdService() servmanager.Service {
// ThresholdService implements Service interface
type ThresholdService struct {
sync.RWMutex
thrs *engine.ThresholdService
rpc *v1.ThresholdSv1
connChan chan rpcclient.RpcClientConnection
@@ -54,6 +56,8 @@ func (thrs *ThresholdService) Start(sp servmanager.ServiceProvider, waitCache bo
<-sp.GetCacheS().GetPrecacheChannel(utils.CacheThresholdFilterIndexes)
}
thrs.Lock()
defer thrs.Unlock()
thrs.thrs, err = engine.NewThresholdService(sp.GetDM(), sp.GetConfig(), sp.GetFilterS())
if err != nil {
utils.Logger.Crit(fmt.Sprintf("<%s> Could not init, error: %s", utils.ThresholdS, err.Error()))
@@ -76,12 +80,16 @@ func (thrs *ThresholdService) GetIntenternalChan() (conn chan rpcclient.RpcClien
// Reload handles the change of config
func (thrs *ThresholdService) Reload(sp servmanager.ServiceProvider) (err error) {
thrs.Lock()
thrs.thrs.Reload()
thrs.Unlock()
return
}
// Shutdown stops the service
func (thrs *ThresholdService) Shutdown() (err error) {
thrs.Lock()
defer thrs.Unlock()
if err = thrs.thrs.Shutdown(); err != nil {
return
}
@@ -98,6 +106,8 @@ func (thrs *ThresholdService) GetRPCInterface() interface{} {
// IsRunning returns if the service is running
func (thrs *ThresholdService) IsRunning() bool {
thrs.RLock()
defer thrs.RUnlock()
return thrs != nil && thrs.thrs != nil
}