mirror of
https://github.com/cgrates/cgrates.git
synced 2026-02-11 18:16:24 +05:00
Redesign of CacheS init
This commit is contained in:
@@ -903,14 +903,14 @@ func startFilterService(filterSChan chan *engine.FilterS, cacheS *engine.CacheS,
|
||||
}
|
||||
|
||||
// loaderService will start and register APIs for LoaderService if enabled
|
||||
func startloaderS(cfg *config.CGRConfig,
|
||||
func startLoaderS(cfg *config.CGRConfig,
|
||||
dm *engine.DataManager, server *utils.Server, exitChan chan bool,
|
||||
filterSChan chan *engine.FilterS, internalCacheSChan chan rpcclient.RpcClientConnection) {
|
||||
filterSChan chan *engine.FilterS, cacheSChan chan rpcclient.RpcClientConnection) {
|
||||
filterS := <-filterSChan
|
||||
filterSChan <- filterS
|
||||
|
||||
ldrS := loaders.NewLoaderService(dm, cfg.LoaderCfg(),
|
||||
cfg.GeneralCfg().DefaultTimezone, exitChan, filterS, internalCacheSChan)
|
||||
cfg.GeneralCfg().DefaultTimezone, exitChan, filterS, cacheSChan)
|
||||
if !ldrS.Enabled() {
|
||||
return
|
||||
}
|
||||
@@ -1034,6 +1034,24 @@ func startAnalyzerService(internalAnalyzerSChan chan rpcclient.RpcClientConnecti
|
||||
internalAnalyzerSChan <- aSv1
|
||||
}
|
||||
|
||||
// initCacheS inits the CacheS and starts precaching as well as populating internal channel for RPC conns
|
||||
func initCacheS(internalCacheSChan chan rpcclient.RpcClientConnection,
|
||||
server *utils.Server, dm *engine.DataManager, exitChan chan bool) (chS *engine.CacheS) {
|
||||
chS = engine.NewCacheS(cfg, dm)
|
||||
go func() {
|
||||
if err := chS.Precache(); err != nil {
|
||||
utils.Logger.Crit(fmt.Sprintf("<%s> could not init, error: %s", utils.CacheS, err.Error()))
|
||||
exitChan <- true
|
||||
}
|
||||
}()
|
||||
|
||||
if !cfg.DispatcherSCfg().Enabled {
|
||||
server.RpcRegister(v1.NewCacheSv1(chS))
|
||||
}
|
||||
internalCacheSChan <- chS
|
||||
return
|
||||
}
|
||||
|
||||
func startRpc(server *utils.Server, internalRaterChan,
|
||||
internalCdrSChan, internalRsChan, internalStatSChan,
|
||||
internalAttrSChan, internalChargerSChan, internalThdSChan, internalSuplSChan,
|
||||
@@ -1330,22 +1348,6 @@ func main() {
|
||||
// Rpc/http server
|
||||
server := utils.NewServer()
|
||||
|
||||
// init cache
|
||||
cacheS := engine.NewCacheS(cfg, dm)
|
||||
cacheSv1 := v1.NewCacheSv1(cacheS)
|
||||
if !cfg.DispatcherSCfg().Enabled {
|
||||
server.RpcRegister(cacheSv1) // before pre-caching so we can check status via API
|
||||
}
|
||||
go func() {
|
||||
if err := cacheS.Precache(); err != nil {
|
||||
errCGR := err.(*utils.CGRError)
|
||||
errCGR.ActivateLongError()
|
||||
utils.Logger.Crit(fmt.Sprintf("<%s> error: %s on precache",
|
||||
utils.CacheS, err.Error()))
|
||||
exitChan <- true
|
||||
}
|
||||
}()
|
||||
|
||||
if *httpPprofPath != "" {
|
||||
go server.RegisterProfiler(*httpPprofPath)
|
||||
}
|
||||
@@ -1365,23 +1367,25 @@ func main() {
|
||||
internalDispatcherSChan := make(chan *dispatchers.DispatcherService, 1)
|
||||
internalAnalyzerSChan := make(chan rpcclient.RpcClientConnection, 1)
|
||||
internalCacheSChan := make(chan rpcclient.RpcClientConnection, 1)
|
||||
//Add cacheSv1 into channel
|
||||
internalCacheSChan <- cacheSv1
|
||||
|
||||
// init CacheS
|
||||
cacheS := initCacheS(internalCacheSChan, server, dm, exitChan)
|
||||
|
||||
// Start ServiceManager
|
||||
srvManager := servmanager.NewServiceManager(cfg, dm, exitChan, cacheS)
|
||||
|
||||
// Start rater service
|
||||
if cfg.RalsCfg().RALsEnabled {
|
||||
go startRater(internalRaterChan, cacheS, internalThresholdSChan,
|
||||
internalStatSChan, srvManager, server, dm, loadDb, cdrDb,
|
||||
&stopHandled, exitChan, filterSChan, internalCacheSChan)
|
||||
}
|
||||
|
||||
// Start Scheduler
|
||||
if cfg.SchedulerCfg().Enabled {
|
||||
go srvManager.StartScheduler(true)
|
||||
}
|
||||
|
||||
// Start RALs
|
||||
if cfg.RalsCfg().RALsEnabled {
|
||||
go startRater(internalRaterChan, cacheS, internalThresholdSChan,
|
||||
internalStatSChan, srvManager, server, dm, loadDb, cdrDb,
|
||||
&stopHandled, exitChan, cacheS, filterSChan, internalCacheSChan)
|
||||
}
|
||||
|
||||
// Start CDR Server
|
||||
if cfg.CdrsCfg().CDRSEnabled {
|
||||
go startCDRS(internalCdrSChan, cdrDb, dm,
|
||||
@@ -1475,7 +1479,7 @@ func main() {
|
||||
go startAnalyzerService(internalAnalyzerSChan, server, exitChan)
|
||||
}
|
||||
|
||||
go startloaderS(cfg, dm, server, exitChan, filterSChan, internalCacheSChan)
|
||||
go startLoaderS(cfg, dm, server, exitChan, filterSChan, internalCacheSChan)
|
||||
|
||||
// Serve rpc connections
|
||||
go startRpc(server, internalRaterChan, internalCdrSChan,
|
||||
|
||||
@@ -34,8 +34,10 @@ import (
|
||||
func startRater(internalRaterChan chan rpcclient.RpcClientConnection, cacheS *engine.CacheS,
|
||||
internalThdSChan, internalStatSChan chan rpcclient.RpcClientConnection,
|
||||
serviceManager *servmanager.ServiceManager, server *utils.Server,
|
||||
dm *engine.DataManager, loadDb engine.LoadStorage, cdrDb engine.CdrStorage, stopHandled *bool,
|
||||
exitChan chan bool, filterSChan chan *engine.FilterS, internalCacheSChan chan rpcclient.RpcClientConnection) {
|
||||
dm *engine.DataManager, loadDb engine.LoadStorage, cdrDb engine.CdrStorage,
|
||||
stopHandled *bool, exitChan chan bool, chS *engine.CacheS, // separate from channel for optimization
|
||||
filterSChan chan *engine.FilterS,
|
||||
cacheSChan chan rpcclient.RpcClientConnection) {
|
||||
filterS := <-filterSChan
|
||||
filterSChan <- filterS
|
||||
var waitTasks []chan struct{}
|
||||
@@ -43,15 +45,15 @@ func startRater(internalRaterChan chan rpcclient.RpcClientConnection, cacheS *en
|
||||
waitTasks = append(waitTasks, cacheTaskChan)
|
||||
go func() { //Wait for cache load
|
||||
defer close(cacheTaskChan)
|
||||
<-cacheS.GetPrecacheChannel(utils.CacheDestinations)
|
||||
<-cacheS.GetPrecacheChannel(utils.CacheReverseDestinations)
|
||||
<-cacheS.GetPrecacheChannel(utils.CacheRatingPlans)
|
||||
<-cacheS.GetPrecacheChannel(utils.CacheRatingProfiles)
|
||||
<-cacheS.GetPrecacheChannel(utils.CacheActions)
|
||||
<-cacheS.GetPrecacheChannel(utils.CacheActionPlans)
|
||||
<-cacheS.GetPrecacheChannel(utils.CacheAccountActionPlans)
|
||||
<-cacheS.GetPrecacheChannel(utils.CacheActionTriggers)
|
||||
<-cacheS.GetPrecacheChannel(utils.CacheSharedGroups)
|
||||
<-chS.GetPrecacheChannel(utils.CacheDestinations)
|
||||
<-chS.GetPrecacheChannel(utils.CacheReverseDestinations)
|
||||
<-chS.GetPrecacheChannel(utils.CacheRatingPlans)
|
||||
<-chS.GetPrecacheChannel(utils.CacheRatingProfiles)
|
||||
<-chS.GetPrecacheChannel(utils.CacheActions)
|
||||
<-chS.GetPrecacheChannel(utils.CacheActionPlans)
|
||||
<-chS.GetPrecacheChannel(utils.CacheAccountActionPlans)
|
||||
<-chS.GetPrecacheChannel(utils.CacheActionTriggers)
|
||||
<-chS.GetPrecacheChannel(utils.CacheSharedGroups)
|
||||
}()
|
||||
|
||||
var thdS *rpcclient.RpcClientPool
|
||||
@@ -99,19 +101,19 @@ func startRater(internalRaterChan chan rpcclient.RpcClientConnection, cacheS *en
|
||||
}
|
||||
|
||||
//create cache connection
|
||||
var caches *rpcclient.RpcClientPool
|
||||
var cacheSrpc *rpcclient.RpcClientPool
|
||||
if len(cfg.ApierCfg().CachesConns) != 0 {
|
||||
cachesTaskChan := make(chan struct{})
|
||||
waitTasks = append(waitTasks, cachesTaskChan)
|
||||
go func() {
|
||||
defer close(cachesTaskChan)
|
||||
var err error
|
||||
caches, err = engine.NewRPCPool(rpcclient.POOL_FIRST,
|
||||
cacheSrpc, err = engine.NewRPCPool(rpcclient.POOL_FIRST,
|
||||
cfg.TlsCfg().ClientKey,
|
||||
cfg.TlsCfg().ClientCerificate, cfg.TlsCfg().CaCertificate,
|
||||
cfg.GeneralCfg().ConnectAttempts, cfg.GeneralCfg().Reconnects,
|
||||
cfg.GeneralCfg().ConnectTimeout, cfg.GeneralCfg().ReplyTimeout,
|
||||
cfg.ApierCfg().CachesConns, internalCacheSChan,
|
||||
cfg.ApierCfg().CachesConns, cacheSChan,
|
||||
cfg.GeneralCfg().InternalTtl, false)
|
||||
if err != nil {
|
||||
utils.Logger.Crit(fmt.Sprintf("<APIer> Could not connect to CacheS, error: %s", err.Error()))
|
||||
@@ -120,18 +122,20 @@ func startRater(internalRaterChan chan rpcclient.RpcClientConnection, cacheS *en
|
||||
}
|
||||
}()
|
||||
}
|
||||
//add verification here
|
||||
if caches != nil && reflect.ValueOf(caches).IsNil() {
|
||||
caches = nil
|
||||
}
|
||||
|
||||
// Wait for all connections to complete before going further
|
||||
for _, chn := range waitTasks {
|
||||
<-chn
|
||||
}
|
||||
|
||||
responder := &engine.Responder{
|
||||
ExitChan: exitChan,
|
||||
MaxComputedUsage: cfg.RalsCfg().RALsMaxComputedUsage}
|
||||
|
||||
// correct reflect on cacheS since there is no APIer init
|
||||
if cacheSrpc != nil && reflect.ValueOf(cacheSrpc).IsNil() {
|
||||
cacheSrpc = nil
|
||||
}
|
||||
apierRpcV1 := &v1.ApierV1{
|
||||
StorDb: loadDb,
|
||||
DataManager: dm,
|
||||
@@ -142,7 +146,8 @@ func startRater(internalRaterChan chan rpcclient.RpcClientConnection, cacheS *en
|
||||
HTTPPoster: engine.NewHTTPPoster(cfg.GeneralCfg().HttpSkipTlsVerify,
|
||||
cfg.GeneralCfg().ReplyTimeout),
|
||||
FilterS: filterS,
|
||||
CacheS: caches}
|
||||
CacheS: cacheSrpc}
|
||||
|
||||
if thdS != nil {
|
||||
engine.SetThresholdS(thdS) // temporary architectural fix until we will have separate AccountS
|
||||
}
|
||||
|
||||
@@ -20,6 +20,7 @@ package engine
|
||||
|
||||
import (
|
||||
"fmt"
|
||||
"sync"
|
||||
"time"
|
||||
|
||||
"github.com/cgrates/cgrates/config"
|
||||
@@ -76,7 +77,7 @@ func InitCache(cfg config.CacheCfg) {
|
||||
Cache = ltcache.NewTransCache(cfg.AsTransCacheConfig())
|
||||
}
|
||||
|
||||
// NewCacheS initializes the Cache service
|
||||
// NewCacheS initializes the Cache service and executes the precaching
|
||||
func NewCacheS(cfg *config.CGRConfig, dm *DataManager) (c *CacheS) {
|
||||
InitCache(cfg.CacheCfg()) // to make sure we start with correct config
|
||||
c = &CacheS{cfg: cfg, dm: dm,
|
||||
@@ -104,22 +105,45 @@ func (chS *CacheS) GetPrecacheChannel(chID string) chan struct{} {
|
||||
|
||||
// Precache loads data from DataDB into cache at engine start
|
||||
func (chS *CacheS) Precache() (err error) {
|
||||
var wg sync.WaitGroup // wait for precache to finish
|
||||
errChan := make(chan error)
|
||||
doneChan := make(chan struct{})
|
||||
for cacheID, cacheCfg := range chS.cfg.CacheCfg() {
|
||||
if !precachedPartitions.HasKey(cacheID) {
|
||||
continue
|
||||
}
|
||||
if cacheCfg.Precache {
|
||||
if err = chS.dm.CacheDataFromDB(
|
||||
utils.CacheInstanceToPrefix[cacheID], nil,
|
||||
false); err != nil {
|
||||
return
|
||||
}
|
||||
wg.Add(1)
|
||||
go func() {
|
||||
errCache := chS.dm.CacheDataFromDB(
|
||||
utils.CacheInstanceToPrefix[cacheID], nil,
|
||||
false)
|
||||
if errCache != nil {
|
||||
errChan <- errCache
|
||||
}
|
||||
close(chS.pcItems[cacheID])
|
||||
wg.Done()
|
||||
}()
|
||||
}
|
||||
close(chS.pcItems[cacheID])
|
||||
}
|
||||
go func() { // report wg.Wait on doneChan
|
||||
wg.Wait()
|
||||
close(doneChan)
|
||||
}()
|
||||
select {
|
||||
case err = <-errChan:
|
||||
case <-doneChan:
|
||||
}
|
||||
return
|
||||
}
|
||||
|
||||
// APIs start here
|
||||
|
||||
// Call gives the ability of CacheS to be passed as internal RPC
|
||||
func (chS *CacheS) Call(serviceMethod string, args interface{}, reply interface{}) error {
|
||||
return utils.RPCCall(chS, serviceMethod, args, reply)
|
||||
}
|
||||
|
||||
type ArgsGetCacheItemIDs struct {
|
||||
CacheID string
|
||||
ItemIDPrefix string
|
||||
|
||||
@@ -28,7 +28,7 @@ import (
|
||||
"github.com/cgrates/rpcclient"
|
||||
)
|
||||
|
||||
func NewRPCPool(dispatchStrategy string, key_path, cert_path, ca_path string, connAttempts, reconnects int,
|
||||
func NewRPCPool(dispatchStrategy string, keyPath, certPath, caPath string, connAttempts, reconnects int,
|
||||
connectTimeout, replyTimeout time.Duration, rpcConnCfgs []*config.HaPoolConfig,
|
||||
internalConnChan chan rpcclient.RpcClientConnection, ttl time.Duration, lazyConnect bool) (*rpcclient.RpcClientPool, error) {
|
||||
var rpcClient *rpcclient.RpcClient
|
||||
@@ -44,14 +44,14 @@ func NewRPCPool(dispatchStrategy string, key_path, cert_path, ca_path string, co
|
||||
case <-time.After(ttl):
|
||||
return nil, errors.New("TTL triggered")
|
||||
}
|
||||
rpcClient, err = rpcclient.NewRpcClient("", "", rpcConnCfg.Tls, key_path, cert_path, ca_path, connAttempts,
|
||||
rpcClient, err = rpcclient.NewRpcClient("", "", rpcConnCfg.Tls, keyPath, certPath, caPath, connAttempts,
|
||||
reconnects, connectTimeout, replyTimeout, rpcclient.INTERNAL_RPC, internalConn, lazyConnect)
|
||||
} else if utils.IsSliceMember([]string{utils.MetaJSONrpc, utils.MetaGOBrpc, ""}, rpcConnCfg.Transport) {
|
||||
codec := utils.GOB
|
||||
if rpcConnCfg.Transport != "" {
|
||||
codec = rpcConnCfg.Transport[1:] // Transport contains always * before codec understood by rpcclient
|
||||
}
|
||||
rpcClient, err = rpcclient.NewRpcClient("tcp", rpcConnCfg.Address, rpcConnCfg.Tls, key_path, cert_path, ca_path,
|
||||
rpcClient, err = rpcclient.NewRpcClient("tcp", rpcConnCfg.Address, rpcConnCfg.Tls, keyPath, certPath, caPath,
|
||||
connAttempts, reconnects, connectTimeout, replyTimeout, codec, nil, lazyConnect)
|
||||
} else {
|
||||
return nil, fmt.Errorf("Unsupported transport: <%s>", rpcConnCfg.Transport)
|
||||
|
||||
@@ -220,7 +220,7 @@ func (sS *StatService) matchingStatQueuesForEvent(args *StatsArgsProcessEvent) (
|
||||
}
|
||||
|
||||
// Call implements rpcclient.RpcClientConnection interface for internal RPC
|
||||
// here for cases when passing StatsService as rpccclient.RpcClientConnection (ie. in ResourceS)
|
||||
// here for cases when passing StatsService as rpccclient.RpcClientConnection
|
||||
func (ss *StatService) Call(serviceMethod string, args interface{}, reply interface{}) error {
|
||||
return utils.RPCCall(ss, serviceMethod, args, reply)
|
||||
}
|
||||
|
||||
Reference in New Issue
Block a user