From 7cb4aea488b59ed0a534e9eb5a6a5cea485114f1 Mon Sep 17 00:00:00 2001 From: DanB Date: Wed, 7 Mar 2018 18:44:36 +0100 Subject: [PATCH] Precaching done by CacheS --- cmd/cgr-engine/cgr-engine.go | 133 +++++++++++++++++++++-------------- cmd/cgr-engine/rater.go | 97 +++++-------------------- engine/caches.go | 71 ++++++++++++++++--- engine/datamanager.go | 3 +- glide.lock | 2 +- servmanager/servmanager.go | 11 +-- utils/consts.go | 1 + 7 files changed, 170 insertions(+), 148 deletions(-) diff --git a/cmd/cgr-engine/cgr-engine.go b/cmd/cgr-engine/cgr-engine.go index 0116b1751..c7ad82972 100644 --- a/cmd/cgr-engine/cgr-engine.go +++ b/cmd/cgr-engine/cgr-engine.go @@ -492,10 +492,13 @@ func startUsersServer(internalUserSChan chan rpcclient.RpcClientConnection, dm * } // startAttributeService fires up the AttributeS -func startAttributeService(internalAttributeSChan chan rpcclient.RpcClientConnection, cfg *config.CGRConfig, - dm *engine.DataManager, server *utils.Server, exitChan chan bool, filterSChan chan *engine.FilterS) { +func startAttributeService(internalAttributeSChan chan rpcclient.RpcClientConnection, + cacheS *engine.CacheS, cfg *config.CGRConfig, dm *engine.DataManager, + server *utils.Server, exitChan chan bool, filterSChan chan *engine.FilterS) { filterS := <-filterSChan filterSChan <- filterS + <-cacheS.GetPrecacheChannel(utils.CacheAttributeProfiles) + aS, err := engine.NewAttributeService(dm, filterS, cfg.AttributeSCfg().StringIndexedFields, cfg.AttributeSCfg().PrefixIndexedFields) if err != nil { @@ -516,7 +519,8 @@ func startAttributeService(internalAttributeSChan chan rpcclient.RpcClientConnec internalAttributeSChan <- aSv1 } -func startResourceService(internalRsChan, internalThresholdSChan chan rpcclient.RpcClientConnection, cfg *config.CGRConfig, +func startResourceService(internalRsChan chan rpcclient.RpcClientConnection, cacheS *engine.CacheS, + internalThresholdSChan chan rpcclient.RpcClientConnection, cfg *config.CGRConfig, dm *engine.DataManager, server *utils.Server, exitChan chan bool, filterSChan chan *engine.FilterS) { var err error var thdSConn *rpcclient.RpcClientPool @@ -531,6 +535,9 @@ func startResourceService(internalRsChan, internalThresholdSChan chan rpcclient. return } } + <-cacheS.GetPrecacheChannel(utils.CacheResourceProfiles) + <-cacheS.GetPrecacheChannel(utils.CacheResources) + rS, err := engine.NewResourceService(dm, cfg.ResourceSCfg().StoreInterval, thdSConn, filterS, cfg.ResourceSCfg().StringIndexedFields, cfg.ResourceSCfg().PrefixIndexedFields) if err != nil { @@ -554,7 +561,8 @@ func startResourceService(internalRsChan, internalThresholdSChan chan rpcclient. } // startStatService fires up the StatS -func startStatService(internalStatSChan, internalThresholdSChan chan rpcclient.RpcClientConnection, cfg *config.CGRConfig, +func startStatService(internalStatSChan chan rpcclient.RpcClientConnection, cacheS *engine.CacheS, + internalThresholdSChan chan rpcclient.RpcClientConnection, cfg *config.CGRConfig, dm *engine.DataManager, server *utils.Server, exitChan chan bool, filterSChan chan *engine.FilterS) { var err error var thdSConn *rpcclient.RpcClientPool @@ -569,6 +577,9 @@ func startStatService(internalStatSChan, internalThresholdSChan chan rpcclient.R return } } + <-cacheS.GetPrecacheChannel(utils.CacheStatQueueProfiles) + <-cacheS.GetPrecacheChannel(utils.CacheStatQueues) + sS, err := engine.NewStatService(dm, cfg.StatSCfg().StoreInterval, thdSConn, filterS, cfg.StatSCfg().StringIndexedFields, cfg.StatSCfg().PrefixIndexedFields) if err != nil { @@ -591,10 +602,14 @@ func startStatService(internalStatSChan, internalThresholdSChan chan rpcclient.R } // startThresholdService fires up the ThresholdS -func startThresholdService(internalThresholdSChan chan rpcclient.RpcClientConnection, cfg *config.CGRConfig, - dm *engine.DataManager, server *utils.Server, exitChan chan bool, filterSChan chan *engine.FilterS) { +func startThresholdService(internalThresholdSChan chan rpcclient.RpcClientConnection, + cacheS *engine.CacheS, cfg *config.CGRConfig, dm *engine.DataManager, + server *utils.Server, exitChan chan bool, filterSChan chan *engine.FilterS) { filterS := <-filterSChan filterSChan <- filterS + <-cacheS.GetPrecacheChannel(utils.CacheThresholdProfiles) + <-cacheS.GetPrecacheChannel(utils.CacheThresholds) + tS, err := engine.NewThresholdService(dm, cfg.ThresholdSCfg().StringIndexedFields, cfg.ThresholdSCfg().PrefixIndexedFields, cfg.ThresholdSCfg().StoreInterval, filterS) if err != nil { @@ -617,8 +632,10 @@ func startThresholdService(internalThresholdSChan chan rpcclient.RpcClientConnec } // startSupplierService fires up the ThresholdS -func startSupplierService(internalSupplierSChan, internalRsChan, internalStatSChan chan rpcclient.RpcClientConnection, - cfg *config.CGRConfig, dm *engine.DataManager, server *utils.Server, exitChan chan bool, filterSChan chan *engine.FilterS) { +func startSupplierService(internalSupplierSChan chan rpcclient.RpcClientConnection, cacheS *engine.CacheS, + internalRsChan, internalStatSChan chan rpcclient.RpcClientConnection, + cfg *config.CGRConfig, dm *engine.DataManager, server *utils.Server, + exitChan chan bool, filterSChan chan *engine.FilterS) { var err error filterS := <-filterSChan filterSChan <- filterS @@ -647,6 +664,8 @@ func startSupplierService(internalSupplierSChan, internalRsChan, internalStatSCh return } } + <-cacheS.GetPrecacheChannel(utils.CacheSupplierProfiles) + splS, err := engine.NewSupplierService(dm, cfg.DefaultTimezone, filterS, cfg.SupplierSCfg().StringIndexedFields, cfg.SupplierSCfg().PrefixIndexedFields, resourceSConn, statSConn) if err != nil { @@ -670,10 +689,10 @@ func startSupplierService(internalSupplierSChan, internalRsChan, internalStatSCh } // startFilterService fires up the FilterS -func startFilterService(filterSChan chan *engine.FilterS, +func startFilterService(filterSChan chan *engine.FilterS, cacheS *engine.CacheS, internalStatSChan chan rpcclient.RpcClientConnection, cfg *config.CGRConfig, dm *engine.DataManager, exitChan chan bool) { - + <-cacheS.GetPrecacheChannel(utils.CacheFilters) filterSChan <- engine.NewFilterS(cfg, internalStatSChan, dm) } @@ -781,7 +800,7 @@ func main() { cfg.NodeID = *nodeID } config.SetCgrConfig(cfg) // Share the config object - engine.InitCache(cfg.CacheCfg()) + // init syslog if err = initLogger(cfg); err != nil { log.Fatalf("Could not initialize syslog connection, err: <%s>", err.Error()) @@ -795,39 +814,34 @@ func main() { var loadDb engine.LoadStorage var cdrDb engine.CdrStorage var dm *engine.DataManager - - if cfg.RALsEnabled || cfg.CDRStatsEnabled || cfg.PubSubServerEnabled || - cfg.AliasesServerEnabled || cfg.UserServerEnabled || cfg.SchedulerEnabled { - dm, err = engine.ConfigureDataStorage(cfg.DataDbType, cfg.DataDbHost, cfg.DataDbPort, - cfg.DataDbName, cfg.DataDbUser, cfg.DataDbPass, cfg.DBDataEncoding, cfg.CacheCfg(), cfg.LoadHistorySize) - if err != nil { // Cannot configure getter database, show stopper - utils.Logger.Crit(fmt.Sprintf("Could not configure dataDb: %s exiting!", err)) - return - } - defer dm.DataDB().Close() - engine.SetDataStorage(dm) - if err := engine.CheckVersions(dm.DataDB()); err != nil { - fmt.Println(err.Error()) - return - } + // FixMe: add here exceptions if running only CDRC + dm, err = engine.ConfigureDataStorage(cfg.DataDbType, cfg.DataDbHost, cfg.DataDbPort, + cfg.DataDbName, cfg.DataDbUser, cfg.DataDbPass, cfg.DBDataEncoding, cfg.CacheCfg(), cfg.LoadHistorySize) + if err != nil { // Cannot configure getter database, show stopper + utils.Logger.Crit(fmt.Sprintf("Could not configure dataDb: %s exiting!", err)) + return } - if cfg.RALsEnabled || cfg.CDRSEnabled || cfg.SchedulerEnabled { // Only connect to storDb if necessary - storDb, err := engine.ConfigureStorStorage(cfg.StorDBType, cfg.StorDBHost, cfg.StorDBPort, - cfg.StorDBName, cfg.StorDBUser, cfg.StorDBPass, cfg.DBDataEncoding, cfg.StorDBMaxOpenConns, - cfg.StorDBMaxIdleConns, cfg.StorDBConnMaxLifetime, cfg.StorDBCDRSIndexes) - if err != nil { // Cannot configure logger database, show stopper - utils.Logger.Crit(fmt.Sprintf("Could not configure logger database: %s exiting!", err)) - return - } - defer storDb.Close() - // loadDb,cdrDb and storDb are all mapped on the same stordb storage - loadDb = storDb.(engine.LoadStorage) - cdrDb = storDb.(engine.CdrStorage) - engine.SetCdrStorage(cdrDb) - if err := engine.CheckVersions(storDb); err != nil { - fmt.Println(err.Error()) - return - } + defer dm.DataDB().Close() + engine.SetDataStorage(dm) + if err := engine.CheckVersions(dm.DataDB()); err != nil { + fmt.Println(err.Error()) + return + } + storDb, err := engine.ConfigureStorStorage(cfg.StorDBType, cfg.StorDBHost, cfg.StorDBPort, + cfg.StorDBName, cfg.StorDBUser, cfg.StorDBPass, cfg.DBDataEncoding, cfg.StorDBMaxOpenConns, + cfg.StorDBMaxIdleConns, cfg.StorDBConnMaxLifetime, cfg.StorDBCDRSIndexes) + if err != nil { // Cannot configure logger database, show stopper + utils.Logger.Crit(fmt.Sprintf("Could not configure logger database: %s exiting!", err)) + return + } + defer storDb.Close() + // loadDb,cdrDb and storDb are all mapped on the same stordb storage + loadDb = storDb.(engine.LoadStorage) + cdrDb = storDb.(engine.CdrStorage) + engine.SetCdrStorage(cdrDb) + if err := engine.CheckVersions(storDb); err != nil { + fmt.Println(err.Error()) + return } // Done initing DBs @@ -836,6 +850,18 @@ func main() { engine.SetLcrSubjectPrefixMatching(cfg.LcrSubjectPrefixMatching) stopHandled := false + // init cache + cacheS := engine.NewCacheS(cfg, dm) + go func() { + if err := cacheS.Precache(); err != nil { + errCGR := err.(*utils.CGRError) + errCGR.ActivateLongError() + utils.Logger.Crit(fmt.Sprintf("<%s> error: %s on precache", + utils.CacheS, err.Error())) + exitChan <- true + } + }() + // Rpc/http server server := new(utils.Server) @@ -843,7 +869,6 @@ func main() { // Define internal connections via channels internalRaterChan := make(chan rpcclient.RpcClientConnection, 1) - cacheDoneChan := make(chan struct{}, 1) internalCdrSChan := make(chan rpcclient.RpcClientConnection, 1) internalCdrStatSChan := make(chan rpcclient.RpcClientConnection, 1) internalPubSubSChan := make(chan rpcclient.RpcClientConnection, 1) @@ -858,11 +883,11 @@ func main() { filterSChan := make(chan *engine.FilterS, 1) // Start ServiceManager - srvManager := servmanager.NewServiceManager(cfg, dm, exitChan, cacheDoneChan) + srvManager := servmanager.NewServiceManager(cfg, dm, exitChan, cacheS) // Start rater service if cfg.RALsEnabled { - go startRater(internalRaterChan, cacheDoneChan, internalThresholdSChan, + go startRater(internalRaterChan, cacheS, internalThresholdSChan, internalCdrStatSChan, internalStatSChan, internalPubSubSChan, internalAttributeSChan, internalUserSChan, internalAliaseSChan, @@ -934,28 +959,32 @@ func main() { go startUsersServer(internalUserSChan, dm, server, exitChan) } // Start FilterS - go startFilterService(filterSChan, internalStatSChan, cfg, dm, exitChan) + go startFilterService(filterSChan, cacheS, internalStatSChan, cfg, dm, exitChan) if cfg.AttributeSCfg().Enabled { - go startAttributeService(internalAttributeSChan, cfg, dm, server, exitChan, filterSChan) + go startAttributeService(internalAttributeSChan, cacheS, + cfg, dm, server, exitChan, filterSChan) } // Start RL service if cfg.ResourceSCfg().Enabled { - go startResourceService(internalRsChan, + go startResourceService(internalRsChan, cacheS, internalThresholdSChan, cfg, dm, server, exitChan, filterSChan) } if cfg.StatSCfg().Enabled { - go startStatService(internalStatSChan, internalThresholdSChan, cfg, dm, server, exitChan, filterSChan) + go startStatService(internalStatSChan, cacheS, + internalThresholdSChan, cfg, dm, server, exitChan, filterSChan) } if cfg.ThresholdSCfg().Enabled { - go startThresholdService(internalThresholdSChan, cfg, dm, server, exitChan, filterSChan) + go startThresholdService(internalThresholdSChan, cacheS, + cfg, dm, server, exitChan, filterSChan) } if cfg.SupplierSCfg().Enabled { - go startSupplierService(internalSupplierSChan, internalRsChan, internalStatSChan, + go startSupplierService(internalSupplierSChan, cacheS, + internalRsChan, internalStatSChan, cfg, dm, server, exitChan, filterSChan) } diff --git a/cmd/cgr-engine/rater.go b/cmd/cgr-engine/rater.go index 881416aca..8b9190d31 100755 --- a/cmd/cgr-engine/rater.go +++ b/cmd/cgr-engine/rater.go @@ -30,95 +30,30 @@ import ( ) // Starts rater and reports on chan -func startRater(internalRaterChan chan rpcclient.RpcClientConnection, cacheDoneChan chan struct{}, +func startRater(internalRaterChan chan rpcclient.RpcClientConnection, cacheS *engine.CacheS, internalThdSChan, internalCdrStatSChan, internalStatSChan, internalPubSubSChan, internalAttributeSChan, internalUserSChan, internalAliaseSChan chan rpcclient.RpcClientConnection, serviceManager *servmanager.ServiceManager, server *utils.Server, dm *engine.DataManager, loadDb engine.LoadStorage, cdrDb engine.CdrStorage, stopHandled *bool, exitChan chan bool) { var waitTasks []chan struct{} - cacheCfg := cfg.CacheCfg() - //Cache load cacheTaskChan := make(chan struct{}) waitTasks = append(waitTasks, cacheTaskChan) - go func() { + go func() { //Wait for cache load defer close(cacheTaskChan) - var dstIDs, rvDstIDs, rplIDs, rpfIDs, actIDs, aplIDs, aapIDs, atrgIDs, sgIDs, lcrIDs, dcIDs, alsIDs, rvAlsIDs, rspIDs, resIDs, stqIDs, stqpIDs, thIDs, thpIDs, fltrIDs, sppIDs, alsPrfIDs []string - if cCfg, has := cacheCfg[utils.CacheDestinations]; !has || !cCfg.Precache { - dstIDs = make([]string, 0) // Don't cache any - } - if cCfg, has := cacheCfg[utils.CacheReverseDestinations]; !has || !cCfg.Precache { - rvDstIDs = make([]string, 0) - } - if cCfg, has := cacheCfg[utils.CacheRatingPlans]; !has || !cCfg.Precache { - rplIDs = make([]string, 0) - } - if cCfg, has := cacheCfg[utils.CacheRatingProfiles]; !has || !cCfg.Precache { - rpfIDs = make([]string, 0) - } - if cCfg, has := cacheCfg[utils.CacheActions]; !has || !cCfg.Precache { - actIDs = make([]string, 0) - } - if cCfg, has := cacheCfg[utils.CacheActionPlans]; !has || !cCfg.Precache { - aplIDs = make([]string, 0) - } - if cCfg, has := cacheCfg[utils.CacheAccountActionPlans]; !has || !cCfg.Precache { - aapIDs = make([]string, 0) - } - if cCfg, has := cacheCfg[utils.CacheActionTriggers]; !has || !cCfg.Precache { - atrgIDs = make([]string, 0) - } - if cCfg, has := cacheCfg[utils.CacheSharedGroups]; !has || !cCfg.Precache { - sgIDs = make([]string, 0) - } - if cCfg, has := cacheCfg[utils.CacheLCRRules]; !has || !cCfg.Precache { - lcrIDs = make([]string, 0) - } - if cCfg, has := cacheCfg[utils.CacheDerivedChargers]; !has || !cCfg.Precache { - dcIDs = make([]string, 0) - } - if cCfg, has := cacheCfg[utils.CacheAliases]; !has || !cCfg.Precache { - alsIDs = make([]string, 0) - } - if cCfg, has := cacheCfg[utils.CacheReverseAliases]; !has || !cCfg.Precache { - rvAlsIDs = make([]string, 0) - } - if cCfg, has := cacheCfg[utils.CacheResourceProfiles]; !has || !cCfg.Precache { - rspIDs = make([]string, 0) - } - if cCfg, has := cacheCfg[utils.CacheResources]; !has || !cCfg.Precache { - resIDs = make([]string, 0) - } - if cCfg, has := cacheCfg[utils.CacheStatQueues]; !has || !cCfg.Precache { - stqIDs = make([]string, 0) - } - if cCfg, has := cacheCfg[utils.CacheStatQueueProfiles]; !has || !cCfg.Precache { - stqpIDs = make([]string, 0) - } - if cCfg, has := cacheCfg[utils.CacheThresholds]; !has || !cCfg.Precache { - thIDs = make([]string, 0) - } - if cCfg, has := cacheCfg[utils.CacheThresholdProfiles]; !has || !cCfg.Precache { - thpIDs = make([]string, 0) - } - if cCfg, has := cacheCfg[utils.CacheFilters]; !has || !cCfg.Precache { - fltrIDs = make([]string, 0) - } - if cCfg, has := cacheCfg[utils.CacheSupplierProfiles]; !has || !cCfg.Precache { - sppIDs = make([]string, 0) - } - if cCfg, has := cacheCfg[utils.CacheAttributeProfiles]; !has || !cCfg.Precache { - alsPrfIDs = make([]string, 0) - } - - // ToDo: Add here timings - if err := dm.LoadDataDBCache(dstIDs, rvDstIDs, rplIDs, rpfIDs, actIDs, aplIDs, aapIDs, - atrgIDs, sgIDs, lcrIDs, dcIDs, alsIDs, rvAlsIDs, rspIDs, resIDs, stqIDs, - stqpIDs, thIDs, thpIDs, fltrIDs, sppIDs, alsPrfIDs); err != nil { - utils.Logger.Crit(fmt.Sprintf(" Cache rating error: %s", err.Error())) - exitChan <- true - return - } - cacheDoneChan <- struct{}{} + <-cacheS.GetPrecacheChannel(utils.CacheDestinations) + <-cacheS.GetPrecacheChannel(utils.CacheReverseDestinations) + <-cacheS.GetPrecacheChannel(utils.CacheRatingPlans) + <-cacheS.GetPrecacheChannel(utils.CacheRatingProfiles) + <-cacheS.GetPrecacheChannel(utils.CacheActions) + <-cacheS.GetPrecacheChannel(utils.CacheActionPlans) + <-cacheS.GetPrecacheChannel(utils.CacheAccountActionPlans) + <-cacheS.GetPrecacheChannel(utils.CacheActionTriggers) + <-cacheS.GetPrecacheChannel(utils.CacheActionTriggers) + <-cacheS.GetPrecacheChannel(utils.CacheSharedGroups) + <-cacheS.GetPrecacheChannel(utils.CacheLCRRules) + <-cacheS.GetPrecacheChannel(utils.CacheDerivedChargers) + <-cacheS.GetPrecacheChannel(utils.CacheAliases) + <-cacheS.GetPrecacheChannel(utils.CacheReverseAliases) }() var thdS *rpcclient.RpcClientPool diff --git a/engine/caches.go b/engine/caches.go index 1114d990d..1812efd3b 100644 --- a/engine/caches.go +++ b/engine/caches.go @@ -20,6 +20,7 @@ package engine import ( "github.com/cgrates/cgrates/config" + "github.com/cgrates/cgrates/utils" "github.com/cgrates/ltcache" ) @@ -29,6 +30,33 @@ func init() { InitCache(nil) } +var precachedPartitions = []string{ + utils.CacheDestinations, + utils.CacheReverseDestinations, + utils.CacheRatingPlans, + utils.CacheRatingProfiles, + utils.CacheLCRRules, + utils.CacheActions, + utils.CacheActionPlans, + utils.CacheAccountActionPlans, + utils.CacheActionTriggers, + utils.CacheSharedGroups, + utils.CacheAliases, + utils.CacheReverseAliases, + utils.CacheDerivedChargers, + utils.CacheResourceProfiles, + utils.CacheResources, + utils.CacheEventResources, + utils.CacheTimings, + utils.CacheStatQueueProfiles, + utils.CacheStatQueues, + utils.CacheThresholdProfiles, + utils.CacheThresholds, + utils.CacheFilters, + utils.CacheSupplierProfiles, + utils.CacheAttributeProfiles, +} + // InitCache will instantiate the cache with specific or default configuraiton func InitCache(cfg config.CacheConfig) { if cfg == nil { @@ -37,19 +65,46 @@ func InitCache(cfg config.CacheConfig) { Cache = ltcache.NewTransCache(cfg.AsTransCacheConfig()) } +// NewCacheS initializes the Cache service func NewCacheS(cfg *config.CGRConfig, dm *DataManager) (c *CacheS) { - InitCache(cfg.CacheCfg()) // make sure we use the same config as package shared one + InitCache(cfg.CacheCfg()) // to make sure we start with correct config c = &CacheS{cfg: cfg, dm: dm, - cItems: make(map[string]chan struct{})} - for k := range cfg.CacheCfg() { - c.cItems[k] = make(chan struct{}) + pcItems: make(map[string]chan struct{})} + for cacheID := range cfg.CacheCfg() { + if !utils.IsSliceMember(precachedPartitions, cacheID) { + continue + } + c.pcItems[cacheID] = make(chan struct{}) } return } -// CacheS deals with cache preload and other cache related tasks +// CacheS deals with cache preload and other cache related tasks/APIs type CacheS struct { - cfg *config.CGRConfig - dm *DataManager - cItems map[string]chan struct{} // signal precaching done + cfg *config.CGRConfig + dm *DataManager + pcItems map[string]chan struct{} // signal precaching +} + +// GetChannel returns the channel used to signal precaching +func (chS *CacheS) GetPrecacheChannel(chID string) chan struct{} { + return chS.pcItems[chID] +} + +// Precache loads data from DataDB into cache at engine start +func (chS *CacheS) Precache() (err error) { + for cacheID, cacheCfg := range chS.cfg.CacheCfg() { + if !utils.IsSliceMember(precachedPartitions, cacheID) { + continue + } + if cacheCfg.Precache { + if err = chS.dm.CacheDataFromDB( + utils.CacheInstanceToPrefix[cacheID], nil, + false); err != nil { + return + } + } + close(chS.pcItems[cacheID]) + } + return } diff --git a/engine/datamanager.go b/engine/datamanager.go index 146fb16ad..edbb2c67c 100644 --- a/engine/datamanager.go +++ b/engine/datamanager.go @@ -125,7 +125,8 @@ func (dm *DataManager) PreloadCacheForPrefix(prefix string) error { } func (dm *DataManager) CacheDataFromDB(prfx string, ids []string, mustBeCached bool) (err error) { - if !utils.IsSliceMember([]string{utils.DESTINATION_PREFIX, + if !utils.IsSliceMember([]string{ + utils.DESTINATION_PREFIX, utils.REVERSE_DESTINATION_PREFIX, utils.RATING_PLAN_PREFIX, utils.RATING_PROFILE_PREFIX, diff --git a/glide.lock b/glide.lock index e261bb22a..53f406422 100644 --- a/glide.lock +++ b/glide.lock @@ -16,7 +16,7 @@ imports: - name: github.com/cgrates/kamevapi version: e4dfe7d6cb5bb0872111fe7d61af0e1a19eda485 - name: github.com/cgrates/ltcache - version: 9416f986a2dc5f0ea5cb2abd5bc8c5fc99355280 + version: 3aface1319c71e5050445fc659361a59bbeeedff - name: github.com/cgrates/osipsdagram version: 3d6beed663452471dec3ca194137a30d379d9e8f - name: github.com/cgrates/radigo diff --git a/servmanager/servmanager.go b/servmanager/servmanager.go index 864ebd476..bee26dda0 100644 --- a/servmanager/servmanager.go +++ b/servmanager/servmanager.go @@ -31,8 +31,10 @@ import ( "github.com/cgrates/rpcclient" ) -func NewServiceManager(cfg *config.CGRConfig, dm *engine.DataManager, engineShutdown chan bool, cacheDoneChan chan struct{}) *ServiceManager { - return &ServiceManager{cfg: cfg, dm: dm, engineShutdown: engineShutdown, cacheDoneChan: cacheDoneChan} +func NewServiceManager(cfg *config.CGRConfig, dm *engine.DataManager, + engineShutdown chan bool, cacheS *engine.CacheS) *ServiceManager { + return &ServiceManager{cfg: cfg, dm: dm, + engineShutdown: engineShutdown, cacheS: cacheS} } // ServiceManager handles service management ran by the engine @@ -41,7 +43,7 @@ type ServiceManager struct { cfg *config.CGRConfig dm *engine.DataManager engineShutdown chan bool - cacheDoneChan chan struct{} // Wait for cache to load + cacheS *engine.CacheS sched *scheduler.Scheduler rpcChans map[string]chan rpcclient.RpcClientConnection // services expected to start rpcServices map[string]rpcclient.RpcClientConnection // services started @@ -58,8 +60,7 @@ func (srvMngr *ServiceManager) StartScheduler(waitCache bool) error { "the scheduler is already running") } if waitCache { // Wait for cache to load data before starting - cacheDone := <-srvMngr.cacheDoneChan - srvMngr.cacheDoneChan <- cacheDone + <-srvMngr.cacheS.GetPrecacheChannel(utils.CacheActionPlans) // wait for ActionPlans to be cached } utils.Logger.Info(" Starting CGRateS Scheduler.") sched := scheduler.NewScheduler(srvMngr.dm) diff --git a/utils/consts.go b/utils/consts.go index c382a4cde..90edf174d 100755 --- a/utils/consts.go +++ b/utils/consts.go @@ -521,6 +521,7 @@ const ( DiameterAgent = "DiameterAgent" Error = "Error" MetaCGRReply = "*cgrReply" + CacheS = "CacheS" ) //MetaMetrics