diff --git a/cmd/cgr-engine/cgr-engine.go b/cmd/cgr-engine/cgr-engine.go index b561a9484..17f162475 100644 --- a/cmd/cgr-engine/cgr-engine.go +++ b/cmd/cgr-engine/cgr-engine.go @@ -903,14 +903,14 @@ func startFilterService(filterSChan chan *engine.FilterS, cacheS *engine.CacheS, } // loaderService will start and register APIs for LoaderService if enabled -func startloaderS(cfg *config.CGRConfig, +func startLoaderS(cfg *config.CGRConfig, dm *engine.DataManager, server *utils.Server, exitChan chan bool, - filterSChan chan *engine.FilterS, internalCacheSChan chan rpcclient.RpcClientConnection) { + filterSChan chan *engine.FilterS, cacheSChan chan rpcclient.RpcClientConnection) { filterS := <-filterSChan filterSChan <- filterS ldrS := loaders.NewLoaderService(dm, cfg.LoaderCfg(), - cfg.GeneralCfg().DefaultTimezone, exitChan, filterS, internalCacheSChan) + cfg.GeneralCfg().DefaultTimezone, exitChan, filterS, cacheSChan) if !ldrS.Enabled() { return } @@ -1034,6 +1034,24 @@ func startAnalyzerService(internalAnalyzerSChan chan rpcclient.RpcClientConnecti internalAnalyzerSChan <- aSv1 } +// initCacheS inits the CacheS and starts precaching as well as populating internal channel for RPC conns +func initCacheS(internalCacheSChan chan rpcclient.RpcClientConnection, + server *utils.Server, dm *engine.DataManager, exitChan chan bool) (chS *engine.CacheS) { + chS = engine.NewCacheS(cfg, dm) + go func() { + if err := chS.Precache(); err != nil { + utils.Logger.Crit(fmt.Sprintf("<%s> could not init, error: %s", utils.CacheS, err.Error())) + exitChan <- true + } + }() + + if !cfg.DispatcherSCfg().Enabled { + server.RpcRegister(v1.NewCacheSv1(chS)) + } + internalCacheSChan <- chS + return +} + func startRpc(server *utils.Server, internalRaterChan, internalCdrSChan, internalRsChan, internalStatSChan, internalAttrSChan, internalChargerSChan, internalThdSChan, internalSuplSChan, @@ -1330,22 +1348,6 @@ func main() { // Rpc/http server server := utils.NewServer() - // init cache - cacheS := engine.NewCacheS(cfg, dm) - cacheSv1 := v1.NewCacheSv1(cacheS) - if !cfg.DispatcherSCfg().Enabled { - server.RpcRegister(cacheSv1) // before pre-caching so we can check status via API - } - go func() { - if err := cacheS.Precache(); err != nil { - errCGR := err.(*utils.CGRError) - errCGR.ActivateLongError() - utils.Logger.Crit(fmt.Sprintf("<%s> error: %s on precache", - utils.CacheS, err.Error())) - exitChan <- true - } - }() - if *httpPprofPath != "" { go server.RegisterProfiler(*httpPprofPath) } @@ -1365,23 +1367,25 @@ func main() { internalDispatcherSChan := make(chan *dispatchers.DispatcherService, 1) internalAnalyzerSChan := make(chan rpcclient.RpcClientConnection, 1) internalCacheSChan := make(chan rpcclient.RpcClientConnection, 1) - //Add cacheSv1 into channel - internalCacheSChan <- cacheSv1 + + // init CacheS + cacheS := initCacheS(internalCacheSChan, server, dm, exitChan) + // Start ServiceManager srvManager := servmanager.NewServiceManager(cfg, dm, exitChan, cacheS) - // Start rater service - if cfg.RalsCfg().RALsEnabled { - go startRater(internalRaterChan, cacheS, internalThresholdSChan, - internalStatSChan, srvManager, server, dm, loadDb, cdrDb, - &stopHandled, exitChan, filterSChan, internalCacheSChan) - } - // Start Scheduler if cfg.SchedulerCfg().Enabled { go srvManager.StartScheduler(true) } + // Start RALs + if cfg.RalsCfg().RALsEnabled { + go startRater(internalRaterChan, cacheS, internalThresholdSChan, + internalStatSChan, srvManager, server, dm, loadDb, cdrDb, + &stopHandled, exitChan, cacheS, filterSChan, internalCacheSChan) + } + // Start CDR Server if cfg.CdrsCfg().CDRSEnabled { go startCDRS(internalCdrSChan, cdrDb, dm, @@ -1475,7 +1479,7 @@ func main() { go startAnalyzerService(internalAnalyzerSChan, server, exitChan) } - go startloaderS(cfg, dm, server, exitChan, filterSChan, internalCacheSChan) + go startLoaderS(cfg, dm, server, exitChan, filterSChan, internalCacheSChan) // Serve rpc connections go startRpc(server, internalRaterChan, internalCdrSChan, diff --git a/cmd/cgr-engine/rater.go b/cmd/cgr-engine/rater.go index e15cf157a..4f28a2fe8 100755 --- a/cmd/cgr-engine/rater.go +++ b/cmd/cgr-engine/rater.go @@ -34,8 +34,10 @@ import ( func startRater(internalRaterChan chan rpcclient.RpcClientConnection, cacheS *engine.CacheS, internalThdSChan, internalStatSChan chan rpcclient.RpcClientConnection, serviceManager *servmanager.ServiceManager, server *utils.Server, - dm *engine.DataManager, loadDb engine.LoadStorage, cdrDb engine.CdrStorage, stopHandled *bool, - exitChan chan bool, filterSChan chan *engine.FilterS, internalCacheSChan chan rpcclient.RpcClientConnection) { + dm *engine.DataManager, loadDb engine.LoadStorage, cdrDb engine.CdrStorage, + stopHandled *bool, exitChan chan bool, chS *engine.CacheS, // separate from channel for optimization + filterSChan chan *engine.FilterS, + cacheSChan chan rpcclient.RpcClientConnection) { filterS := <-filterSChan filterSChan <- filterS var waitTasks []chan struct{} @@ -43,15 +45,15 @@ func startRater(internalRaterChan chan rpcclient.RpcClientConnection, cacheS *en waitTasks = append(waitTasks, cacheTaskChan) go func() { //Wait for cache load defer close(cacheTaskChan) - <-cacheS.GetPrecacheChannel(utils.CacheDestinations) - <-cacheS.GetPrecacheChannel(utils.CacheReverseDestinations) - <-cacheS.GetPrecacheChannel(utils.CacheRatingPlans) - <-cacheS.GetPrecacheChannel(utils.CacheRatingProfiles) - <-cacheS.GetPrecacheChannel(utils.CacheActions) - <-cacheS.GetPrecacheChannel(utils.CacheActionPlans) - <-cacheS.GetPrecacheChannel(utils.CacheAccountActionPlans) - <-cacheS.GetPrecacheChannel(utils.CacheActionTriggers) - <-cacheS.GetPrecacheChannel(utils.CacheSharedGroups) + <-chS.GetPrecacheChannel(utils.CacheDestinations) + <-chS.GetPrecacheChannel(utils.CacheReverseDestinations) + <-chS.GetPrecacheChannel(utils.CacheRatingPlans) + <-chS.GetPrecacheChannel(utils.CacheRatingProfiles) + <-chS.GetPrecacheChannel(utils.CacheActions) + <-chS.GetPrecacheChannel(utils.CacheActionPlans) + <-chS.GetPrecacheChannel(utils.CacheAccountActionPlans) + <-chS.GetPrecacheChannel(utils.CacheActionTriggers) + <-chS.GetPrecacheChannel(utils.CacheSharedGroups) }() var thdS *rpcclient.RpcClientPool @@ -99,19 +101,19 @@ func startRater(internalRaterChan chan rpcclient.RpcClientConnection, cacheS *en } //create cache connection - var caches *rpcclient.RpcClientPool + var cacheSrpc *rpcclient.RpcClientPool if len(cfg.ApierCfg().CachesConns) != 0 { cachesTaskChan := make(chan struct{}) waitTasks = append(waitTasks, cachesTaskChan) go func() { defer close(cachesTaskChan) var err error - caches, err = engine.NewRPCPool(rpcclient.POOL_FIRST, + cacheSrpc, err = engine.NewRPCPool(rpcclient.POOL_FIRST, cfg.TlsCfg().ClientKey, cfg.TlsCfg().ClientCerificate, cfg.TlsCfg().CaCertificate, cfg.GeneralCfg().ConnectAttempts, cfg.GeneralCfg().Reconnects, cfg.GeneralCfg().ConnectTimeout, cfg.GeneralCfg().ReplyTimeout, - cfg.ApierCfg().CachesConns, internalCacheSChan, + cfg.ApierCfg().CachesConns, cacheSChan, cfg.GeneralCfg().InternalTtl, false) if err != nil { utils.Logger.Crit(fmt.Sprintf(" Could not connect to CacheS, error: %s", err.Error())) @@ -120,18 +122,20 @@ func startRater(internalRaterChan chan rpcclient.RpcClientConnection, cacheS *en } }() } - //add verification here - if caches != nil && reflect.ValueOf(caches).IsNil() { - caches = nil - } // Wait for all connections to complete before going further for _, chn := range waitTasks { <-chn } + responder := &engine.Responder{ ExitChan: exitChan, MaxComputedUsage: cfg.RalsCfg().RALsMaxComputedUsage} + + // correct reflect on cacheS since there is no APIer init + if cacheSrpc != nil && reflect.ValueOf(cacheSrpc).IsNil() { + cacheSrpc = nil + } apierRpcV1 := &v1.ApierV1{ StorDb: loadDb, DataManager: dm, @@ -142,7 +146,8 @@ func startRater(internalRaterChan chan rpcclient.RpcClientConnection, cacheS *en HTTPPoster: engine.NewHTTPPoster(cfg.GeneralCfg().HttpSkipTlsVerify, cfg.GeneralCfg().ReplyTimeout), FilterS: filterS, - CacheS: caches} + CacheS: cacheSrpc} + if thdS != nil { engine.SetThresholdS(thdS) // temporary architectural fix until we will have separate AccountS } diff --git a/engine/caches.go b/engine/caches.go index 1a5d5e041..9e941fc4f 100644 --- a/engine/caches.go +++ b/engine/caches.go @@ -20,6 +20,7 @@ package engine import ( "fmt" + "sync" "time" "github.com/cgrates/cgrates/config" @@ -76,7 +77,7 @@ func InitCache(cfg config.CacheCfg) { Cache = ltcache.NewTransCache(cfg.AsTransCacheConfig()) } -// NewCacheS initializes the Cache service +// NewCacheS initializes the Cache service and executes the precaching func NewCacheS(cfg *config.CGRConfig, dm *DataManager) (c *CacheS) { InitCache(cfg.CacheCfg()) // to make sure we start with correct config c = &CacheS{cfg: cfg, dm: dm, @@ -104,22 +105,45 @@ func (chS *CacheS) GetPrecacheChannel(chID string) chan struct{} { // Precache loads data from DataDB into cache at engine start func (chS *CacheS) Precache() (err error) { + var wg sync.WaitGroup // wait for precache to finish + errChan := make(chan error) + doneChan := make(chan struct{}) for cacheID, cacheCfg := range chS.cfg.CacheCfg() { if !precachedPartitions.HasKey(cacheID) { continue } if cacheCfg.Precache { - if err = chS.dm.CacheDataFromDB( - utils.CacheInstanceToPrefix[cacheID], nil, - false); err != nil { - return - } + wg.Add(1) + go func() { + errCache := chS.dm.CacheDataFromDB( + utils.CacheInstanceToPrefix[cacheID], nil, + false) + if errCache != nil { + errChan <- errCache + } + close(chS.pcItems[cacheID]) + wg.Done() + }() } - close(chS.pcItems[cacheID]) + } + go func() { // report wg.Wait on doneChan + wg.Wait() + close(doneChan) + }() + select { + case err = <-errChan: + case <-doneChan: } return } +// APIs start here + +// Call gives the ability of CacheS to be passed as internal RPC +func (chS *CacheS) Call(serviceMethod string, args interface{}, reply interface{}) error { + return utils.RPCCall(chS, serviceMethod, args, reply) +} + type ArgsGetCacheItemIDs struct { CacheID string ItemIDPrefix string diff --git a/engine/libengine.go b/engine/libengine.go index ca36b6e0b..435ce6ef6 100644 --- a/engine/libengine.go +++ b/engine/libengine.go @@ -28,7 +28,7 @@ import ( "github.com/cgrates/rpcclient" ) -func NewRPCPool(dispatchStrategy string, key_path, cert_path, ca_path string, connAttempts, reconnects int, +func NewRPCPool(dispatchStrategy string, keyPath, certPath, caPath string, connAttempts, reconnects int, connectTimeout, replyTimeout time.Duration, rpcConnCfgs []*config.HaPoolConfig, internalConnChan chan rpcclient.RpcClientConnection, ttl time.Duration, lazyConnect bool) (*rpcclient.RpcClientPool, error) { var rpcClient *rpcclient.RpcClient @@ -44,14 +44,14 @@ func NewRPCPool(dispatchStrategy string, key_path, cert_path, ca_path string, co case <-time.After(ttl): return nil, errors.New("TTL triggered") } - rpcClient, err = rpcclient.NewRpcClient("", "", rpcConnCfg.Tls, key_path, cert_path, ca_path, connAttempts, + rpcClient, err = rpcclient.NewRpcClient("", "", rpcConnCfg.Tls, keyPath, certPath, caPath, connAttempts, reconnects, connectTimeout, replyTimeout, rpcclient.INTERNAL_RPC, internalConn, lazyConnect) } else if utils.IsSliceMember([]string{utils.MetaJSONrpc, utils.MetaGOBrpc, ""}, rpcConnCfg.Transport) { codec := utils.GOB if rpcConnCfg.Transport != "" { codec = rpcConnCfg.Transport[1:] // Transport contains always * before codec understood by rpcclient } - rpcClient, err = rpcclient.NewRpcClient("tcp", rpcConnCfg.Address, rpcConnCfg.Tls, key_path, cert_path, ca_path, + rpcClient, err = rpcclient.NewRpcClient("tcp", rpcConnCfg.Address, rpcConnCfg.Tls, keyPath, certPath, caPath, connAttempts, reconnects, connectTimeout, replyTimeout, codec, nil, lazyConnect) } else { return nil, fmt.Errorf("Unsupported transport: <%s>", rpcConnCfg.Transport) diff --git a/engine/stats.go b/engine/stats.go index 737451617..167592bae 100644 --- a/engine/stats.go +++ b/engine/stats.go @@ -220,7 +220,7 @@ func (sS *StatService) matchingStatQueuesForEvent(args *StatsArgsProcessEvent) ( } // Call implements rpcclient.RpcClientConnection interface for internal RPC -// here for cases when passing StatsService as rpccclient.RpcClientConnection (ie. in ResourceS) +// here for cases when passing StatsService as rpccclient.RpcClientConnection func (ss *StatService) Call(serviceMethod string, args interface{}, reply interface{}) error { return utils.RPCCall(ss, serviceMethod, args, reply) }