diff --git a/services/attributes.go b/services/attributes.go index ba0f45b29..5b1ccb3b1 100644 --- a/services/attributes.go +++ b/services/attributes.go @@ -20,6 +20,7 @@ package services import ( "fmt" + "sync" v1 "github.com/cgrates/cgrates/apier/v1" "github.com/cgrates/cgrates/engine" @@ -37,6 +38,7 @@ func NewAttributeService() servmanager.Service { // AttributeService implements Service interface type AttributeService struct { + sync.RWMutex attrS *engine.AttributeService rpc *v1.AttributeSv1 connChan chan rpcclient.RpcClientConnection @@ -53,6 +55,8 @@ func (attrS *AttributeService) Start(sp servmanager.ServiceProvider, waitCache b <-sp.GetCacheS().GetPrecacheChannel(utils.CacheAttributeFilterIndexes) } + attrS.Lock() + defer attrS.Unlock() attrS.attrS, err = engine.NewAttributeService(sp.GetDM(), sp.GetFilterS(), sp.GetConfig()) if err != nil { @@ -62,12 +66,11 @@ func (attrS *AttributeService) Start(sp servmanager.ServiceProvider, waitCache b return } utils.Logger.Info(fmt.Sprintf("<%s> starting <%s> subsystem", utils.ServiceManager, utils.AttributeS)) - aSv1 := v1.NewAttributeSv1(attrS.attrS) + attrS.rpc = v1.NewAttributeSv1(attrS.attrS) if !sp.GetConfig().DispatcherSCfg().Enabled { - sp.GetServer().RpcRegister(aSv1) + sp.GetServer().RpcRegister(attrS.rpc) } - attrS.connChan <- aSv1 - attrS.rpc = aSv1 + attrS.connChan <- attrS.rpc return } @@ -83,6 +86,8 @@ func (attrS *AttributeService) Reload(sp servmanager.ServiceProvider) (err error // Shutdown stops the service func (attrS *AttributeService) Shutdown() (err error) { + attrS.Lock() + defer attrS.Unlock() if err = attrS.attrS.Shutdown(); err != nil { return } @@ -99,6 +104,8 @@ func (attrS *AttributeService) GetRPCInterface() interface{} { // IsRunning returns if the service is running func (attrS *AttributeService) IsRunning() bool { + attrS.RLock() + defer attrS.RUnlock() return attrS != nil && attrS.attrS != nil } diff --git a/services/chargers.go b/services/chargers.go index b44dad7d8..1683603a7 100644 --- a/services/chargers.go +++ b/services/chargers.go @@ -20,6 +20,7 @@ package services import ( "fmt" + "sync" v1 "github.com/cgrates/cgrates/apier/v1" "github.com/cgrates/cgrates/engine" @@ -37,6 +38,7 @@ func NewChargerService() servmanager.Service { // ChargerService implements Service interface type ChargerService struct { + sync.RWMutex chrS *engine.ChargerService rpc *v1.ChargerSv1 connChan chan rpcclient.RpcClientConnection @@ -57,7 +59,8 @@ func (chrS *ChargerService) Start(sp servmanager.ServiceProvider, waitCache bool utils.ChargerS, utils.AttributeS, err.Error())) return } - + chrS.Lock() + defer chrS.Unlock() if chrS.chrS, err = engine.NewChargerService(sp.GetDM(), sp.GetFilterS(), chrSConn, sp.GetConfig()); err != nil { utils.Logger.Crit( fmt.Sprintf("<%s> Could not init, error: %s", @@ -86,6 +89,8 @@ func (chrS *ChargerService) Reload(sp servmanager.ServiceProvider) (err error) { // Shutdown stops the service func (chrS *ChargerService) Shutdown() (err error) { + chrS.Lock() + defer chrS.Unlock() if err = chrS.chrS.Shutdown(); err != nil { return } @@ -102,6 +107,8 @@ func (chrS *ChargerService) GetRPCInterface() interface{} { // IsRunning returns if the service is running func (chrS *ChargerService) IsRunning() bool { + chrS.RLock() + defer chrS.RUnlock() return chrS != nil && chrS.chrS != nil } diff --git a/services/resources.go b/services/resources.go index 4ed5a5d4d..c0f4545ec 100644 --- a/services/resources.go +++ b/services/resources.go @@ -20,6 +20,7 @@ package services import ( "fmt" + "sync" v1 "github.com/cgrates/cgrates/apier/v1" "github.com/cgrates/cgrates/engine" @@ -37,6 +38,7 @@ func NewResourceService() servmanager.Service { // ResourceService implements Service interface type ResourceService struct { + sync.RWMutex reS *engine.ResourceService rpc *v1.ResourceSv1 connChan chan rpcclient.RpcClientConnection @@ -58,6 +60,8 @@ func (reS *ResourceService) Start(sp servmanager.ServiceProvider, waitCache bool return } + reS.Lock() + defer reS.Unlock() reS.reS, err = engine.NewResourceService(sp.GetDM(), sp.GetConfig(), thdSConn, sp.GetFilterS()) if err != nil { utils.Logger.Crit(fmt.Sprintf("<%s> Could not init, error: %s", utils.ResourceS, err.Error())) @@ -80,12 +84,16 @@ func (reS *ResourceService) GetIntenternalChan() (conn chan rpcclient.RpcClientC // Reload handles the change of config func (reS *ResourceService) Reload(sp servmanager.ServiceProvider) (err error) { + reS.Lock() reS.reS.Reload() + defer reS.Unlock() return } // Shutdown stops the service func (reS *ResourceService) Shutdown() (err error) { + reS.Lock() + defer reS.Unlock() if err = reS.reS.Shutdown(); err != nil { return } @@ -102,6 +110,8 @@ func (reS *ResourceService) GetRPCInterface() interface{} { // IsRunning returns if the service is running func (reS *ResourceService) IsRunning() bool { + reS.RLock() + defer reS.RUnlock() return reS != nil && reS.reS != nil } diff --git a/services/schedulers.go b/services/schedulers.go index 0a34a6725..8f156ace4 100644 --- a/services/schedulers.go +++ b/services/schedulers.go @@ -51,10 +51,13 @@ func (schS *SchedulerService) Start(sp servmanager.ServiceProvider, waitCache bo return fmt.Errorf("service aleady running") } - schS.Lock() if waitCache { // Wait for cache to load data before starting <-sp.GetCacheS().GetPrecacheChannel(utils.CacheActionPlans) // wait for ActionPlans to be cached } + + schS.Lock() + defer schS.Unlock() + utils.Logger.Info(" Starting CGRateS Scheduler.") schS.schS = scheduler.NewScheduler(sp.GetDM()) go schS.schS.Loop() @@ -63,7 +66,6 @@ func (schS *SchedulerService) Start(sp servmanager.ServiceProvider, waitCache bo if !sp.GetConfig().DispatcherSCfg().Enabled { sp.GetServer().RpcRegister(schS.rpc) } - schS.Unlock() schS.connChan <- schS.rpc // Create connection to CDR Server and share it in engine(used for *cdrlog action) @@ -86,20 +88,20 @@ func (schS *SchedulerService) GetIntenternalChan() (conn chan rpcclient.RpcClien // Reload handles the change of config func (schS *SchedulerService) Reload(sp servmanager.ServiceProvider) (err error) { - schS.RLock() + schS.Lock() schS.schS.Reload() - defer schS.RUnlock() + defer schS.Unlock() return } // Shutdown stops the service func (schS *SchedulerService) Shutdown() (err error) { - schS.schS.Shutdown() schS.Lock() + schS.schS.Shutdown() schS.schS = nil schS.rpc = nil - schS.Unlock() <-schS.connChan + schS.Unlock() return } diff --git a/services/stats.go b/services/stats.go index 3b5288dc1..c6dc09ff6 100644 --- a/services/stats.go +++ b/services/stats.go @@ -20,6 +20,7 @@ package services import ( "fmt" + "sync" v1 "github.com/cgrates/cgrates/apier/v1" "github.com/cgrates/cgrates/engine" @@ -37,6 +38,7 @@ func NewStatService() servmanager.Service { // StatService implements Service interface type StatService struct { + sync.RWMutex sts *engine.StatService rpc *v1.StatSv1 connChan chan rpcclient.RpcClientConnection @@ -58,6 +60,8 @@ func (sts *StatService) Start(sp servmanager.ServiceProvider, waitCache bool) (e utils.Logger.Crit(fmt.Sprintf("<%s> Could not connect to ThresholdS: %s", utils.StatS, err.Error())) return } + sts.Lock() + defer sts.Unlock() sts.sts, err = engine.NewStatService(sp.GetDM(), sp.GetConfig(), thdSConn, sp.GetFilterS()) if err != nil { utils.Logger.Crit(fmt.Sprintf(" Could not init, error: %s", err.Error())) @@ -80,12 +84,16 @@ func (sts *StatService) GetIntenternalChan() (conn chan rpcclient.RpcClientConne // Reload handles the change of config func (sts *StatService) Reload(sp servmanager.ServiceProvider) (err error) { + sts.Lock() sts.sts.Reload() + sts.Unlock() return } // Shutdown stops the service func (sts *StatService) Shutdown() (err error) { + sts.Lock() + defer sts.Unlock() if err = sts.sts.Shutdown(); err != nil { return } @@ -102,6 +110,8 @@ func (sts *StatService) GetRPCInterface() interface{} { // IsRunning returns if the service is running func (sts *StatService) IsRunning() bool { + sts.RLock() + defer sts.RUnlock() return sts != nil && sts.sts != nil } diff --git a/services/suppliers.go b/services/suppliers.go index 35a5e56ac..db41e6d45 100644 --- a/services/suppliers.go +++ b/services/suppliers.go @@ -20,6 +20,7 @@ package services import ( "fmt" + "sync" v1 "github.com/cgrates/cgrates/apier/v1" "github.com/cgrates/cgrates/engine" @@ -37,6 +38,7 @@ func NewSupplierService() servmanager.Service { // SupplierService implements Service interface type SupplierService struct { + sync.RWMutex splS *engine.SupplierService rpc *v1.SupplierSv1 connChan chan rpcclient.RpcClientConnection @@ -79,6 +81,8 @@ func (splS *SupplierService) Start(sp servmanager.ServiceProvider, waitCache boo utils.SupplierS, err.Error())) return } + splS.Lock() + defer splS.Unlock() utils.Logger.Info(fmt.Sprintf("<%s> starting <%s> subsystem", utils.CoreS, utils.SupplierS)) splS.rpc = v1.NewSupplierSv1(splS.splS) if !sp.GetConfig().DispatcherSCfg().Enabled { @@ -100,6 +104,8 @@ func (splS *SupplierService) Reload(sp servmanager.ServiceProvider) (err error) // Shutdown stops the service func (splS *SupplierService) Shutdown() (err error) { + splS.Lock() + defer splS.Unlock() if err = splS.splS.Shutdown(); err != nil { return } @@ -116,6 +122,8 @@ func (splS *SupplierService) GetRPCInterface() interface{} { // IsRunning returns if the service is running func (splS *SupplierService) IsRunning() bool { + splS.RLock() + defer splS.RUnlock() return splS != nil && splS.splS != nil } diff --git a/services/thresholds.go b/services/thresholds.go index 9218edc51..5db8f2bb9 100644 --- a/services/thresholds.go +++ b/services/thresholds.go @@ -20,6 +20,7 @@ package services import ( "fmt" + "sync" v1 "github.com/cgrates/cgrates/apier/v1" "github.com/cgrates/cgrates/engine" @@ -37,6 +38,7 @@ func NewThresholdService() servmanager.Service { // ThresholdService implements Service interface type ThresholdService struct { + sync.RWMutex thrs *engine.ThresholdService rpc *v1.ThresholdSv1 connChan chan rpcclient.RpcClientConnection @@ -54,6 +56,8 @@ func (thrs *ThresholdService) Start(sp servmanager.ServiceProvider, waitCache bo <-sp.GetCacheS().GetPrecacheChannel(utils.CacheThresholdFilterIndexes) } + thrs.Lock() + defer thrs.Unlock() thrs.thrs, err = engine.NewThresholdService(sp.GetDM(), sp.GetConfig(), sp.GetFilterS()) if err != nil { utils.Logger.Crit(fmt.Sprintf("<%s> Could not init, error: %s", utils.ThresholdS, err.Error())) @@ -76,12 +80,16 @@ func (thrs *ThresholdService) GetIntenternalChan() (conn chan rpcclient.RpcClien // Reload handles the change of config func (thrs *ThresholdService) Reload(sp servmanager.ServiceProvider) (err error) { + thrs.Lock() thrs.thrs.Reload() + thrs.Unlock() return } // Shutdown stops the service func (thrs *ThresholdService) Shutdown() (err error) { + thrs.Lock() + defer thrs.Unlock() if err = thrs.thrs.Shutdown(); err != nil { return } @@ -98,6 +106,8 @@ func (thrs *ThresholdService) GetRPCInterface() interface{} { // IsRunning returns if the service is running func (thrs *ThresholdService) IsRunning() bool { + thrs.RLock() + defer thrs.RUnlock() return thrs != nil && thrs.thrs != nil }