mirror of
https://github.com/cgrates/cgrates.git
synced 2026-02-11 18:16:24 +05:00
Precaching done by CacheS
This commit is contained in:
@@ -492,10 +492,13 @@ func startUsersServer(internalUserSChan chan rpcclient.RpcClientConnection, dm *
|
||||
}
|
||||
|
||||
// startAttributeService fires up the AttributeS
|
||||
func startAttributeService(internalAttributeSChan chan rpcclient.RpcClientConnection, cfg *config.CGRConfig,
|
||||
dm *engine.DataManager, server *utils.Server, exitChan chan bool, filterSChan chan *engine.FilterS) {
|
||||
func startAttributeService(internalAttributeSChan chan rpcclient.RpcClientConnection,
|
||||
cacheS *engine.CacheS, cfg *config.CGRConfig, dm *engine.DataManager,
|
||||
server *utils.Server, exitChan chan bool, filterSChan chan *engine.FilterS) {
|
||||
filterS := <-filterSChan
|
||||
filterSChan <- filterS
|
||||
<-cacheS.GetPrecacheChannel(utils.CacheAttributeProfiles)
|
||||
|
||||
aS, err := engine.NewAttributeService(dm, filterS,
|
||||
cfg.AttributeSCfg().StringIndexedFields, cfg.AttributeSCfg().PrefixIndexedFields)
|
||||
if err != nil {
|
||||
@@ -516,7 +519,8 @@ func startAttributeService(internalAttributeSChan chan rpcclient.RpcClientConnec
|
||||
internalAttributeSChan <- aSv1
|
||||
}
|
||||
|
||||
func startResourceService(internalRsChan, internalThresholdSChan chan rpcclient.RpcClientConnection, cfg *config.CGRConfig,
|
||||
func startResourceService(internalRsChan chan rpcclient.RpcClientConnection, cacheS *engine.CacheS,
|
||||
internalThresholdSChan chan rpcclient.RpcClientConnection, cfg *config.CGRConfig,
|
||||
dm *engine.DataManager, server *utils.Server, exitChan chan bool, filterSChan chan *engine.FilterS) {
|
||||
var err error
|
||||
var thdSConn *rpcclient.RpcClientPool
|
||||
@@ -531,6 +535,9 @@ func startResourceService(internalRsChan, internalThresholdSChan chan rpcclient.
|
||||
return
|
||||
}
|
||||
}
|
||||
<-cacheS.GetPrecacheChannel(utils.CacheResourceProfiles)
|
||||
<-cacheS.GetPrecacheChannel(utils.CacheResources)
|
||||
|
||||
rS, err := engine.NewResourceService(dm, cfg.ResourceSCfg().StoreInterval,
|
||||
thdSConn, filterS, cfg.ResourceSCfg().StringIndexedFields, cfg.ResourceSCfg().PrefixIndexedFields)
|
||||
if err != nil {
|
||||
@@ -554,7 +561,8 @@ func startResourceService(internalRsChan, internalThresholdSChan chan rpcclient.
|
||||
}
|
||||
|
||||
// startStatService fires up the StatS
|
||||
func startStatService(internalStatSChan, internalThresholdSChan chan rpcclient.RpcClientConnection, cfg *config.CGRConfig,
|
||||
func startStatService(internalStatSChan chan rpcclient.RpcClientConnection, cacheS *engine.CacheS,
|
||||
internalThresholdSChan chan rpcclient.RpcClientConnection, cfg *config.CGRConfig,
|
||||
dm *engine.DataManager, server *utils.Server, exitChan chan bool, filterSChan chan *engine.FilterS) {
|
||||
var err error
|
||||
var thdSConn *rpcclient.RpcClientPool
|
||||
@@ -569,6 +577,9 @@ func startStatService(internalStatSChan, internalThresholdSChan chan rpcclient.R
|
||||
return
|
||||
}
|
||||
}
|
||||
<-cacheS.GetPrecacheChannel(utils.CacheStatQueueProfiles)
|
||||
<-cacheS.GetPrecacheChannel(utils.CacheStatQueues)
|
||||
|
||||
sS, err := engine.NewStatService(dm, cfg.StatSCfg().StoreInterval,
|
||||
thdSConn, filterS, cfg.StatSCfg().StringIndexedFields, cfg.StatSCfg().PrefixIndexedFields)
|
||||
if err != nil {
|
||||
@@ -591,10 +602,14 @@ func startStatService(internalStatSChan, internalThresholdSChan chan rpcclient.R
|
||||
}
|
||||
|
||||
// startThresholdService fires up the ThresholdS
|
||||
func startThresholdService(internalThresholdSChan chan rpcclient.RpcClientConnection, cfg *config.CGRConfig,
|
||||
dm *engine.DataManager, server *utils.Server, exitChan chan bool, filterSChan chan *engine.FilterS) {
|
||||
func startThresholdService(internalThresholdSChan chan rpcclient.RpcClientConnection,
|
||||
cacheS *engine.CacheS, cfg *config.CGRConfig, dm *engine.DataManager,
|
||||
server *utils.Server, exitChan chan bool, filterSChan chan *engine.FilterS) {
|
||||
filterS := <-filterSChan
|
||||
filterSChan <- filterS
|
||||
<-cacheS.GetPrecacheChannel(utils.CacheThresholdProfiles)
|
||||
<-cacheS.GetPrecacheChannel(utils.CacheThresholds)
|
||||
|
||||
tS, err := engine.NewThresholdService(dm, cfg.ThresholdSCfg().StringIndexedFields,
|
||||
cfg.ThresholdSCfg().PrefixIndexedFields, cfg.ThresholdSCfg().StoreInterval, filterS)
|
||||
if err != nil {
|
||||
@@ -617,8 +632,10 @@ func startThresholdService(internalThresholdSChan chan rpcclient.RpcClientConnec
|
||||
}
|
||||
|
||||
// startSupplierService fires up the ThresholdS
|
||||
func startSupplierService(internalSupplierSChan, internalRsChan, internalStatSChan chan rpcclient.RpcClientConnection,
|
||||
cfg *config.CGRConfig, dm *engine.DataManager, server *utils.Server, exitChan chan bool, filterSChan chan *engine.FilterS) {
|
||||
func startSupplierService(internalSupplierSChan chan rpcclient.RpcClientConnection, cacheS *engine.CacheS,
|
||||
internalRsChan, internalStatSChan chan rpcclient.RpcClientConnection,
|
||||
cfg *config.CGRConfig, dm *engine.DataManager, server *utils.Server,
|
||||
exitChan chan bool, filterSChan chan *engine.FilterS) {
|
||||
var err error
|
||||
filterS := <-filterSChan
|
||||
filterSChan <- filterS
|
||||
@@ -647,6 +664,8 @@ func startSupplierService(internalSupplierSChan, internalRsChan, internalStatSCh
|
||||
return
|
||||
}
|
||||
}
|
||||
<-cacheS.GetPrecacheChannel(utils.CacheSupplierProfiles)
|
||||
|
||||
splS, err := engine.NewSupplierService(dm, cfg.DefaultTimezone, filterS, cfg.SupplierSCfg().StringIndexedFields,
|
||||
cfg.SupplierSCfg().PrefixIndexedFields, resourceSConn, statSConn)
|
||||
if err != nil {
|
||||
@@ -670,10 +689,10 @@ func startSupplierService(internalSupplierSChan, internalRsChan, internalStatSCh
|
||||
}
|
||||
|
||||
// startFilterService fires up the FilterS
|
||||
func startFilterService(filterSChan chan *engine.FilterS,
|
||||
func startFilterService(filterSChan chan *engine.FilterS, cacheS *engine.CacheS,
|
||||
internalStatSChan chan rpcclient.RpcClientConnection, cfg *config.CGRConfig,
|
||||
dm *engine.DataManager, exitChan chan bool) {
|
||||
|
||||
<-cacheS.GetPrecacheChannel(utils.CacheFilters)
|
||||
filterSChan <- engine.NewFilterS(cfg, internalStatSChan, dm)
|
||||
|
||||
}
|
||||
@@ -781,7 +800,7 @@ func main() {
|
||||
cfg.NodeID = *nodeID
|
||||
}
|
||||
config.SetCgrConfig(cfg) // Share the config object
|
||||
engine.InitCache(cfg.CacheCfg())
|
||||
|
||||
// init syslog
|
||||
if err = initLogger(cfg); err != nil {
|
||||
log.Fatalf("Could not initialize syslog connection, err: <%s>", err.Error())
|
||||
@@ -795,39 +814,34 @@ func main() {
|
||||
var loadDb engine.LoadStorage
|
||||
var cdrDb engine.CdrStorage
|
||||
var dm *engine.DataManager
|
||||
|
||||
if cfg.RALsEnabled || cfg.CDRStatsEnabled || cfg.PubSubServerEnabled ||
|
||||
cfg.AliasesServerEnabled || cfg.UserServerEnabled || cfg.SchedulerEnabled {
|
||||
dm, err = engine.ConfigureDataStorage(cfg.DataDbType, cfg.DataDbHost, cfg.DataDbPort,
|
||||
cfg.DataDbName, cfg.DataDbUser, cfg.DataDbPass, cfg.DBDataEncoding, cfg.CacheCfg(), cfg.LoadHistorySize)
|
||||
if err != nil { // Cannot configure getter database, show stopper
|
||||
utils.Logger.Crit(fmt.Sprintf("Could not configure dataDb: %s exiting!", err))
|
||||
return
|
||||
}
|
||||
defer dm.DataDB().Close()
|
||||
engine.SetDataStorage(dm)
|
||||
if err := engine.CheckVersions(dm.DataDB()); err != nil {
|
||||
fmt.Println(err.Error())
|
||||
return
|
||||
}
|
||||
// FixMe: add here exceptions if running only CDRC
|
||||
dm, err = engine.ConfigureDataStorage(cfg.DataDbType, cfg.DataDbHost, cfg.DataDbPort,
|
||||
cfg.DataDbName, cfg.DataDbUser, cfg.DataDbPass, cfg.DBDataEncoding, cfg.CacheCfg(), cfg.LoadHistorySize)
|
||||
if err != nil { // Cannot configure getter database, show stopper
|
||||
utils.Logger.Crit(fmt.Sprintf("Could not configure dataDb: %s exiting!", err))
|
||||
return
|
||||
}
|
||||
if cfg.RALsEnabled || cfg.CDRSEnabled || cfg.SchedulerEnabled { // Only connect to storDb if necessary
|
||||
storDb, err := engine.ConfigureStorStorage(cfg.StorDBType, cfg.StorDBHost, cfg.StorDBPort,
|
||||
cfg.StorDBName, cfg.StorDBUser, cfg.StorDBPass, cfg.DBDataEncoding, cfg.StorDBMaxOpenConns,
|
||||
cfg.StorDBMaxIdleConns, cfg.StorDBConnMaxLifetime, cfg.StorDBCDRSIndexes)
|
||||
if err != nil { // Cannot configure logger database, show stopper
|
||||
utils.Logger.Crit(fmt.Sprintf("Could not configure logger database: %s exiting!", err))
|
||||
return
|
||||
}
|
||||
defer storDb.Close()
|
||||
// loadDb,cdrDb and storDb are all mapped on the same stordb storage
|
||||
loadDb = storDb.(engine.LoadStorage)
|
||||
cdrDb = storDb.(engine.CdrStorage)
|
||||
engine.SetCdrStorage(cdrDb)
|
||||
if err := engine.CheckVersions(storDb); err != nil {
|
||||
fmt.Println(err.Error())
|
||||
return
|
||||
}
|
||||
defer dm.DataDB().Close()
|
||||
engine.SetDataStorage(dm)
|
||||
if err := engine.CheckVersions(dm.DataDB()); err != nil {
|
||||
fmt.Println(err.Error())
|
||||
return
|
||||
}
|
||||
storDb, err := engine.ConfigureStorStorage(cfg.StorDBType, cfg.StorDBHost, cfg.StorDBPort,
|
||||
cfg.StorDBName, cfg.StorDBUser, cfg.StorDBPass, cfg.DBDataEncoding, cfg.StorDBMaxOpenConns,
|
||||
cfg.StorDBMaxIdleConns, cfg.StorDBConnMaxLifetime, cfg.StorDBCDRSIndexes)
|
||||
if err != nil { // Cannot configure logger database, show stopper
|
||||
utils.Logger.Crit(fmt.Sprintf("Could not configure logger database: %s exiting!", err))
|
||||
return
|
||||
}
|
||||
defer storDb.Close()
|
||||
// loadDb,cdrDb and storDb are all mapped on the same stordb storage
|
||||
loadDb = storDb.(engine.LoadStorage)
|
||||
cdrDb = storDb.(engine.CdrStorage)
|
||||
engine.SetCdrStorage(cdrDb)
|
||||
if err := engine.CheckVersions(storDb); err != nil {
|
||||
fmt.Println(err.Error())
|
||||
return
|
||||
}
|
||||
|
||||
// Done initing DBs
|
||||
@@ -836,6 +850,18 @@ func main() {
|
||||
engine.SetLcrSubjectPrefixMatching(cfg.LcrSubjectPrefixMatching)
|
||||
stopHandled := false
|
||||
|
||||
// init cache
|
||||
cacheS := engine.NewCacheS(cfg, dm)
|
||||
go func() {
|
||||
if err := cacheS.Precache(); err != nil {
|
||||
errCGR := err.(*utils.CGRError)
|
||||
errCGR.ActivateLongError()
|
||||
utils.Logger.Crit(fmt.Sprintf("<%s> error: %s on precache",
|
||||
utils.CacheS, err.Error()))
|
||||
exitChan <- true
|
||||
}
|
||||
}()
|
||||
|
||||
// Rpc/http server
|
||||
server := new(utils.Server)
|
||||
|
||||
@@ -843,7 +869,6 @@ func main() {
|
||||
|
||||
// Define internal connections via channels
|
||||
internalRaterChan := make(chan rpcclient.RpcClientConnection, 1)
|
||||
cacheDoneChan := make(chan struct{}, 1)
|
||||
internalCdrSChan := make(chan rpcclient.RpcClientConnection, 1)
|
||||
internalCdrStatSChan := make(chan rpcclient.RpcClientConnection, 1)
|
||||
internalPubSubSChan := make(chan rpcclient.RpcClientConnection, 1)
|
||||
@@ -858,11 +883,11 @@ func main() {
|
||||
filterSChan := make(chan *engine.FilterS, 1)
|
||||
|
||||
// Start ServiceManager
|
||||
srvManager := servmanager.NewServiceManager(cfg, dm, exitChan, cacheDoneChan)
|
||||
srvManager := servmanager.NewServiceManager(cfg, dm, exitChan, cacheS)
|
||||
|
||||
// Start rater service
|
||||
if cfg.RALsEnabled {
|
||||
go startRater(internalRaterChan, cacheDoneChan, internalThresholdSChan,
|
||||
go startRater(internalRaterChan, cacheS, internalThresholdSChan,
|
||||
internalCdrStatSChan, internalStatSChan,
|
||||
internalPubSubSChan, internalAttributeSChan,
|
||||
internalUserSChan, internalAliaseSChan,
|
||||
@@ -934,28 +959,32 @@ func main() {
|
||||
go startUsersServer(internalUserSChan, dm, server, exitChan)
|
||||
}
|
||||
// Start FilterS
|
||||
go startFilterService(filterSChan, internalStatSChan, cfg, dm, exitChan)
|
||||
go startFilterService(filterSChan, cacheS, internalStatSChan, cfg, dm, exitChan)
|
||||
|
||||
if cfg.AttributeSCfg().Enabled {
|
||||
go startAttributeService(internalAttributeSChan, cfg, dm, server, exitChan, filterSChan)
|
||||
go startAttributeService(internalAttributeSChan, cacheS,
|
||||
cfg, dm, server, exitChan, filterSChan)
|
||||
}
|
||||
|
||||
// Start RL service
|
||||
if cfg.ResourceSCfg().Enabled {
|
||||
go startResourceService(internalRsChan,
|
||||
go startResourceService(internalRsChan, cacheS,
|
||||
internalThresholdSChan, cfg, dm, server, exitChan, filterSChan)
|
||||
}
|
||||
|
||||
if cfg.StatSCfg().Enabled {
|
||||
go startStatService(internalStatSChan, internalThresholdSChan, cfg, dm, server, exitChan, filterSChan)
|
||||
go startStatService(internalStatSChan, cacheS,
|
||||
internalThresholdSChan, cfg, dm, server, exitChan, filterSChan)
|
||||
}
|
||||
|
||||
if cfg.ThresholdSCfg().Enabled {
|
||||
go startThresholdService(internalThresholdSChan, cfg, dm, server, exitChan, filterSChan)
|
||||
go startThresholdService(internalThresholdSChan, cacheS,
|
||||
cfg, dm, server, exitChan, filterSChan)
|
||||
}
|
||||
|
||||
if cfg.SupplierSCfg().Enabled {
|
||||
go startSupplierService(internalSupplierSChan, internalRsChan, internalStatSChan,
|
||||
go startSupplierService(internalSupplierSChan, cacheS,
|
||||
internalRsChan, internalStatSChan,
|
||||
cfg, dm, server, exitChan, filterSChan)
|
||||
}
|
||||
|
||||
|
||||
@@ -30,95 +30,30 @@ import (
|
||||
)
|
||||
|
||||
// Starts rater and reports on chan
|
||||
func startRater(internalRaterChan chan rpcclient.RpcClientConnection, cacheDoneChan chan struct{},
|
||||
func startRater(internalRaterChan chan rpcclient.RpcClientConnection, cacheS *engine.CacheS,
|
||||
internalThdSChan, internalCdrStatSChan, internalStatSChan, internalPubSubSChan,
|
||||
internalAttributeSChan, internalUserSChan, internalAliaseSChan chan rpcclient.RpcClientConnection,
|
||||
serviceManager *servmanager.ServiceManager, server *utils.Server,
|
||||
dm *engine.DataManager, loadDb engine.LoadStorage, cdrDb engine.CdrStorage, stopHandled *bool, exitChan chan bool) {
|
||||
var waitTasks []chan struct{}
|
||||
cacheCfg := cfg.CacheCfg()
|
||||
//Cache load
|
||||
cacheTaskChan := make(chan struct{})
|
||||
waitTasks = append(waitTasks, cacheTaskChan)
|
||||
go func() {
|
||||
go func() { //Wait for cache load
|
||||
defer close(cacheTaskChan)
|
||||
var dstIDs, rvDstIDs, rplIDs, rpfIDs, actIDs, aplIDs, aapIDs, atrgIDs, sgIDs, lcrIDs, dcIDs, alsIDs, rvAlsIDs, rspIDs, resIDs, stqIDs, stqpIDs, thIDs, thpIDs, fltrIDs, sppIDs, alsPrfIDs []string
|
||||
if cCfg, has := cacheCfg[utils.CacheDestinations]; !has || !cCfg.Precache {
|
||||
dstIDs = make([]string, 0) // Don't cache any
|
||||
}
|
||||
if cCfg, has := cacheCfg[utils.CacheReverseDestinations]; !has || !cCfg.Precache {
|
||||
rvDstIDs = make([]string, 0)
|
||||
}
|
||||
if cCfg, has := cacheCfg[utils.CacheRatingPlans]; !has || !cCfg.Precache {
|
||||
rplIDs = make([]string, 0)
|
||||
}
|
||||
if cCfg, has := cacheCfg[utils.CacheRatingProfiles]; !has || !cCfg.Precache {
|
||||
rpfIDs = make([]string, 0)
|
||||
}
|
||||
if cCfg, has := cacheCfg[utils.CacheActions]; !has || !cCfg.Precache {
|
||||
actIDs = make([]string, 0)
|
||||
}
|
||||
if cCfg, has := cacheCfg[utils.CacheActionPlans]; !has || !cCfg.Precache {
|
||||
aplIDs = make([]string, 0)
|
||||
}
|
||||
if cCfg, has := cacheCfg[utils.CacheAccountActionPlans]; !has || !cCfg.Precache {
|
||||
aapIDs = make([]string, 0)
|
||||
}
|
||||
if cCfg, has := cacheCfg[utils.CacheActionTriggers]; !has || !cCfg.Precache {
|
||||
atrgIDs = make([]string, 0)
|
||||
}
|
||||
if cCfg, has := cacheCfg[utils.CacheSharedGroups]; !has || !cCfg.Precache {
|
||||
sgIDs = make([]string, 0)
|
||||
}
|
||||
if cCfg, has := cacheCfg[utils.CacheLCRRules]; !has || !cCfg.Precache {
|
||||
lcrIDs = make([]string, 0)
|
||||
}
|
||||
if cCfg, has := cacheCfg[utils.CacheDerivedChargers]; !has || !cCfg.Precache {
|
||||
dcIDs = make([]string, 0)
|
||||
}
|
||||
if cCfg, has := cacheCfg[utils.CacheAliases]; !has || !cCfg.Precache {
|
||||
alsIDs = make([]string, 0)
|
||||
}
|
||||
if cCfg, has := cacheCfg[utils.CacheReverseAliases]; !has || !cCfg.Precache {
|
||||
rvAlsIDs = make([]string, 0)
|
||||
}
|
||||
if cCfg, has := cacheCfg[utils.CacheResourceProfiles]; !has || !cCfg.Precache {
|
||||
rspIDs = make([]string, 0)
|
||||
}
|
||||
if cCfg, has := cacheCfg[utils.CacheResources]; !has || !cCfg.Precache {
|
||||
resIDs = make([]string, 0)
|
||||
}
|
||||
if cCfg, has := cacheCfg[utils.CacheStatQueues]; !has || !cCfg.Precache {
|
||||
stqIDs = make([]string, 0)
|
||||
}
|
||||
if cCfg, has := cacheCfg[utils.CacheStatQueueProfiles]; !has || !cCfg.Precache {
|
||||
stqpIDs = make([]string, 0)
|
||||
}
|
||||
if cCfg, has := cacheCfg[utils.CacheThresholds]; !has || !cCfg.Precache {
|
||||
thIDs = make([]string, 0)
|
||||
}
|
||||
if cCfg, has := cacheCfg[utils.CacheThresholdProfiles]; !has || !cCfg.Precache {
|
||||
thpIDs = make([]string, 0)
|
||||
}
|
||||
if cCfg, has := cacheCfg[utils.CacheFilters]; !has || !cCfg.Precache {
|
||||
fltrIDs = make([]string, 0)
|
||||
}
|
||||
if cCfg, has := cacheCfg[utils.CacheSupplierProfiles]; !has || !cCfg.Precache {
|
||||
sppIDs = make([]string, 0)
|
||||
}
|
||||
if cCfg, has := cacheCfg[utils.CacheAttributeProfiles]; !has || !cCfg.Precache {
|
||||
alsPrfIDs = make([]string, 0)
|
||||
}
|
||||
|
||||
// ToDo: Add here timings
|
||||
if err := dm.LoadDataDBCache(dstIDs, rvDstIDs, rplIDs, rpfIDs, actIDs, aplIDs, aapIDs,
|
||||
atrgIDs, sgIDs, lcrIDs, dcIDs, alsIDs, rvAlsIDs, rspIDs, resIDs, stqIDs,
|
||||
stqpIDs, thIDs, thpIDs, fltrIDs, sppIDs, alsPrfIDs); err != nil {
|
||||
utils.Logger.Crit(fmt.Sprintf("<RALs> Cache rating error: %s", err.Error()))
|
||||
exitChan <- true
|
||||
return
|
||||
}
|
||||
cacheDoneChan <- struct{}{}
|
||||
<-cacheS.GetPrecacheChannel(utils.CacheDestinations)
|
||||
<-cacheS.GetPrecacheChannel(utils.CacheReverseDestinations)
|
||||
<-cacheS.GetPrecacheChannel(utils.CacheRatingPlans)
|
||||
<-cacheS.GetPrecacheChannel(utils.CacheRatingProfiles)
|
||||
<-cacheS.GetPrecacheChannel(utils.CacheActions)
|
||||
<-cacheS.GetPrecacheChannel(utils.CacheActionPlans)
|
||||
<-cacheS.GetPrecacheChannel(utils.CacheAccountActionPlans)
|
||||
<-cacheS.GetPrecacheChannel(utils.CacheActionTriggers)
|
||||
<-cacheS.GetPrecacheChannel(utils.CacheActionTriggers)
|
||||
<-cacheS.GetPrecacheChannel(utils.CacheSharedGroups)
|
||||
<-cacheS.GetPrecacheChannel(utils.CacheLCRRules)
|
||||
<-cacheS.GetPrecacheChannel(utils.CacheDerivedChargers)
|
||||
<-cacheS.GetPrecacheChannel(utils.CacheAliases)
|
||||
<-cacheS.GetPrecacheChannel(utils.CacheReverseAliases)
|
||||
}()
|
||||
|
||||
var thdS *rpcclient.RpcClientPool
|
||||
|
||||
@@ -20,6 +20,7 @@ package engine
|
||||
|
||||
import (
|
||||
"github.com/cgrates/cgrates/config"
|
||||
"github.com/cgrates/cgrates/utils"
|
||||
"github.com/cgrates/ltcache"
|
||||
)
|
||||
|
||||
@@ -29,6 +30,33 @@ func init() {
|
||||
InitCache(nil)
|
||||
}
|
||||
|
||||
var precachedPartitions = []string{
|
||||
utils.CacheDestinations,
|
||||
utils.CacheReverseDestinations,
|
||||
utils.CacheRatingPlans,
|
||||
utils.CacheRatingProfiles,
|
||||
utils.CacheLCRRules,
|
||||
utils.CacheActions,
|
||||
utils.CacheActionPlans,
|
||||
utils.CacheAccountActionPlans,
|
||||
utils.CacheActionTriggers,
|
||||
utils.CacheSharedGroups,
|
||||
utils.CacheAliases,
|
||||
utils.CacheReverseAliases,
|
||||
utils.CacheDerivedChargers,
|
||||
utils.CacheResourceProfiles,
|
||||
utils.CacheResources,
|
||||
utils.CacheEventResources,
|
||||
utils.CacheTimings,
|
||||
utils.CacheStatQueueProfiles,
|
||||
utils.CacheStatQueues,
|
||||
utils.CacheThresholdProfiles,
|
||||
utils.CacheThresholds,
|
||||
utils.CacheFilters,
|
||||
utils.CacheSupplierProfiles,
|
||||
utils.CacheAttributeProfiles,
|
||||
}
|
||||
|
||||
// InitCache will instantiate the cache with specific or default configuraiton
|
||||
func InitCache(cfg config.CacheConfig) {
|
||||
if cfg == nil {
|
||||
@@ -37,19 +65,46 @@ func InitCache(cfg config.CacheConfig) {
|
||||
Cache = ltcache.NewTransCache(cfg.AsTransCacheConfig())
|
||||
}
|
||||
|
||||
// NewCacheS initializes the Cache service
|
||||
func NewCacheS(cfg *config.CGRConfig, dm *DataManager) (c *CacheS) {
|
||||
InitCache(cfg.CacheCfg()) // make sure we use the same config as package shared one
|
||||
InitCache(cfg.CacheCfg()) // to make sure we start with correct config
|
||||
c = &CacheS{cfg: cfg, dm: dm,
|
||||
cItems: make(map[string]chan struct{})}
|
||||
for k := range cfg.CacheCfg() {
|
||||
c.cItems[k] = make(chan struct{})
|
||||
pcItems: make(map[string]chan struct{})}
|
||||
for cacheID := range cfg.CacheCfg() {
|
||||
if !utils.IsSliceMember(precachedPartitions, cacheID) {
|
||||
continue
|
||||
}
|
||||
c.pcItems[cacheID] = make(chan struct{})
|
||||
}
|
||||
return
|
||||
}
|
||||
|
||||
// CacheS deals with cache preload and other cache related tasks
|
||||
// CacheS deals with cache preload and other cache related tasks/APIs
|
||||
type CacheS struct {
|
||||
cfg *config.CGRConfig
|
||||
dm *DataManager
|
||||
cItems map[string]chan struct{} // signal precaching done
|
||||
cfg *config.CGRConfig
|
||||
dm *DataManager
|
||||
pcItems map[string]chan struct{} // signal precaching
|
||||
}
|
||||
|
||||
// GetChannel returns the channel used to signal precaching
|
||||
func (chS *CacheS) GetPrecacheChannel(chID string) chan struct{} {
|
||||
return chS.pcItems[chID]
|
||||
}
|
||||
|
||||
// Precache loads data from DataDB into cache at engine start
|
||||
func (chS *CacheS) Precache() (err error) {
|
||||
for cacheID, cacheCfg := range chS.cfg.CacheCfg() {
|
||||
if !utils.IsSliceMember(precachedPartitions, cacheID) {
|
||||
continue
|
||||
}
|
||||
if cacheCfg.Precache {
|
||||
if err = chS.dm.CacheDataFromDB(
|
||||
utils.CacheInstanceToPrefix[cacheID], nil,
|
||||
false); err != nil {
|
||||
return
|
||||
}
|
||||
}
|
||||
close(chS.pcItems[cacheID])
|
||||
}
|
||||
return
|
||||
}
|
||||
|
||||
@@ -125,7 +125,8 @@ func (dm *DataManager) PreloadCacheForPrefix(prefix string) error {
|
||||
}
|
||||
|
||||
func (dm *DataManager) CacheDataFromDB(prfx string, ids []string, mustBeCached bool) (err error) {
|
||||
if !utils.IsSliceMember([]string{utils.DESTINATION_PREFIX,
|
||||
if !utils.IsSliceMember([]string{
|
||||
utils.DESTINATION_PREFIX,
|
||||
utils.REVERSE_DESTINATION_PREFIX,
|
||||
utils.RATING_PLAN_PREFIX,
|
||||
utils.RATING_PROFILE_PREFIX,
|
||||
|
||||
2
glide.lock
generated
2
glide.lock
generated
@@ -16,7 +16,7 @@ imports:
|
||||
- name: github.com/cgrates/kamevapi
|
||||
version: e4dfe7d6cb5bb0872111fe7d61af0e1a19eda485
|
||||
- name: github.com/cgrates/ltcache
|
||||
version: 9416f986a2dc5f0ea5cb2abd5bc8c5fc99355280
|
||||
version: 3aface1319c71e5050445fc659361a59bbeeedff
|
||||
- name: github.com/cgrates/osipsdagram
|
||||
version: 3d6beed663452471dec3ca194137a30d379d9e8f
|
||||
- name: github.com/cgrates/radigo
|
||||
|
||||
@@ -31,8 +31,10 @@ import (
|
||||
"github.com/cgrates/rpcclient"
|
||||
)
|
||||
|
||||
func NewServiceManager(cfg *config.CGRConfig, dm *engine.DataManager, engineShutdown chan bool, cacheDoneChan chan struct{}) *ServiceManager {
|
||||
return &ServiceManager{cfg: cfg, dm: dm, engineShutdown: engineShutdown, cacheDoneChan: cacheDoneChan}
|
||||
func NewServiceManager(cfg *config.CGRConfig, dm *engine.DataManager,
|
||||
engineShutdown chan bool, cacheS *engine.CacheS) *ServiceManager {
|
||||
return &ServiceManager{cfg: cfg, dm: dm,
|
||||
engineShutdown: engineShutdown, cacheS: cacheS}
|
||||
}
|
||||
|
||||
// ServiceManager handles service management ran by the engine
|
||||
@@ -41,7 +43,7 @@ type ServiceManager struct {
|
||||
cfg *config.CGRConfig
|
||||
dm *engine.DataManager
|
||||
engineShutdown chan bool
|
||||
cacheDoneChan chan struct{} // Wait for cache to load
|
||||
cacheS *engine.CacheS
|
||||
sched *scheduler.Scheduler
|
||||
rpcChans map[string]chan rpcclient.RpcClientConnection // services expected to start
|
||||
rpcServices map[string]rpcclient.RpcClientConnection // services started
|
||||
@@ -58,8 +60,7 @@ func (srvMngr *ServiceManager) StartScheduler(waitCache bool) error {
|
||||
"the scheduler is already running")
|
||||
}
|
||||
if waitCache { // Wait for cache to load data before starting
|
||||
cacheDone := <-srvMngr.cacheDoneChan
|
||||
srvMngr.cacheDoneChan <- cacheDone
|
||||
<-srvMngr.cacheS.GetPrecacheChannel(utils.CacheActionPlans) // wait for ActionPlans to be cached
|
||||
}
|
||||
utils.Logger.Info("<ServiceManager> Starting CGRateS Scheduler.")
|
||||
sched := scheduler.NewScheduler(srvMngr.dm)
|
||||
|
||||
@@ -521,6 +521,7 @@ const (
|
||||
DiameterAgent = "DiameterAgent"
|
||||
Error = "Error"
|
||||
MetaCGRReply = "*cgrReply"
|
||||
CacheS = "CacheS"
|
||||
)
|
||||
|
||||
//MetaMetrics
|
||||
|
||||
Reference in New Issue
Block a user