mirror of
https://github.com/cgrates/cgrates.git
synced 2026-02-11 18:16:24 +05:00
Add TransCache inside CacheS
This commit is contained in:
committed by
Dan Christian Bogos
parent
1ade35d0d0
commit
81b52eef65
@@ -1744,7 +1744,7 @@ func TestAgReqSetFieldsInCache(t *testing.T) {
|
||||
data := engine.NewInternalDB(nil, nil, true, cfg.DataDbCfg().Items)
|
||||
dm := engine.NewDataManager(data, config.CgrConfig().CacheCfg(), nil)
|
||||
filterS := engine.NewFilterS(cfg, nil, dm)
|
||||
engine.InitCache(cfg.CacheCfg())
|
||||
engine.NewCacheS(cfg, dm)
|
||||
agReq := NewAgentRequest(nil, nil, nil, nil, nil, "cgrates.org", "", filterS, nil, nil)
|
||||
agReq.CGRRequest.Set([]string{utils.Account}, "1001", false, false)
|
||||
|
||||
@@ -1785,7 +1785,7 @@ func TestAgReqSetFieldsInCacheWithTimeOut(t *testing.T) {
|
||||
filterS := engine.NewFilterS(cfg, nil, dm)
|
||||
|
||||
cfg.CacheCfg()[utils.CacheUCH].TTL = 1 * time.Second
|
||||
engine.InitCache(cfg.CacheCfg())
|
||||
engine.Cache = engine.NewCacheS(cfg, dm)
|
||||
agReq := NewAgentRequest(nil, nil, nil, nil, nil, "cgrates.org", "", filterS, nil, nil)
|
||||
agReq.CGRRequest.Set([]string{utils.Account}, "1001", false, false)
|
||||
|
||||
|
||||
@@ -522,7 +522,7 @@ func (da *DiameterAgent) sendASR(originID string, reply *string) (err error) {
|
||||
return
|
||||
}
|
||||
|
||||
// V1ReAuthorize sends a rar meseage to diameter client
|
||||
// V1ReAuthorize sends a rar message to diameter client
|
||||
func (da *DiameterAgent) V1ReAuthorize(originID string, reply *string) (err error) {
|
||||
if originID == "" {
|
||||
utils.Logger.Info(
|
||||
|
||||
@@ -808,7 +808,7 @@ func testApierLoadAccountActions(t *testing.T) {
|
||||
if err := rater.Call(utils.CacheSv1GetCacheStats, new(utils.AttrCacheIDsWithArgDispatcher), &rcvStats); err != nil {
|
||||
t.Error("Got error on CacheSv1.GetCacheStats: ", err.Error())
|
||||
} else if !reflect.DeepEqual(expectedStats, rcvStats) {
|
||||
t.Errorf("Calling CacheSv1.GetCacheStats expected: %+v, received: %+v", utils.ToJSON(expectedStats), utils.ToJSON(rcvStats))
|
||||
t.Errorf("Calling CacheSv1.GetCacheStats expected: %+v,\n received: %+v", utils.ToJSON(expectedStats), utils.ToJSON(rcvStats))
|
||||
}
|
||||
var reply string
|
||||
aa1 := &utils.TPAccountActions{TPid: utils.TEST_SQL, LoadId: utils.TEST_SQL, Tenant: "cgrates.org", Account: "1001"}
|
||||
@@ -824,7 +824,7 @@ func testApierLoadAccountActions(t *testing.T) {
|
||||
if err := rater.Call(utils.CacheSv1GetCacheStats, new(utils.AttrCacheIDsWithArgDispatcher), &rcvStats); err != nil {
|
||||
t.Error("Got error on CacheSv1.GetCacheStats: ", err.Error())
|
||||
} else if !reflect.DeepEqual(expectedStats, rcvStats) {
|
||||
t.Errorf("Calling CacheSv1.GetCacheStats expected: %+v, received: %+v", utils.ToJSON(expectedStats), utils.ToJSON(rcvStats))
|
||||
t.Errorf("Calling CacheSv1.GetCacheStats expected: %+v, \n received: %+v", utils.ToJSON(expectedStats), utils.ToJSON(rcvStats))
|
||||
}
|
||||
}
|
||||
|
||||
|
||||
@@ -310,6 +310,10 @@ func testPrecacheGetCacheStatsAfterRestart(t *testing.T) {
|
||||
Items: 0,
|
||||
Groups: 0,
|
||||
},
|
||||
utils.CacheUCH: {
|
||||
Items: 0,
|
||||
Groups: 0,
|
||||
},
|
||||
}
|
||||
if err := precacheRPC.Call(utils.CacheSv1GetCacheStats, args, &reply); err != nil {
|
||||
t.Error(err.Error())
|
||||
|
||||
@@ -462,6 +462,7 @@ func main() {
|
||||
|
||||
// init CacheS
|
||||
cacheS := initCacheS(internalCacheSChan, server, dmService.GetDM(), exitChan)
|
||||
engine.SetCache(cacheS)
|
||||
|
||||
// init GuardianSv1
|
||||
initGuardianSv1(internalGuardianSChan, server)
|
||||
|
||||
@@ -194,6 +194,7 @@ func testDspChcPrecacheStatus(t *testing.T) {
|
||||
utils.CacheRPCConnections: utils.MetaReady,
|
||||
utils.CacheRPCResponses: utils.MetaReady,
|
||||
utils.CacheRatingProfilesTmp: utils.MetaReady,
|
||||
utils.CacheUCH: utils.MetaReady,
|
||||
}
|
||||
|
||||
if err := dispEngine.RPC.Call(utils.CacheSv1PrecacheStatus, utils.AttrCacheIDsWithArgDispatcher{
|
||||
|
||||
152
engine/caches.go
152
engine/caches.go
@@ -28,27 +28,23 @@ import (
|
||||
"github.com/cgrates/ltcache"
|
||||
)
|
||||
|
||||
// Cache is the global cache used
|
||||
var Cache *ltcache.TransCache
|
||||
var Cache *CacheS
|
||||
|
||||
func init() {
|
||||
InitCache(nil)
|
||||
Cache = NewCacheS(config.CgrConfig(), nil)
|
||||
}
|
||||
|
||||
// InitCache will instantiate the cache with specific or default configuraiton
|
||||
func InitCache(cfg config.CacheCfg) {
|
||||
if cfg == nil {
|
||||
cfg = config.CgrConfig().CacheCfg()
|
||||
}
|
||||
cfg.AddTmpCaches()
|
||||
Cache = ltcache.NewTransCache(cfg.AsTransCacheConfig())
|
||||
//SetCache shared the cache from other subsystems
|
||||
func SetCache(chS *CacheS) {
|
||||
Cache = chS
|
||||
}
|
||||
|
||||
// NewCacheS initializes the Cache service and executes the precaching
|
||||
func NewCacheS(cfg *config.CGRConfig, dm *DataManager) (c *CacheS) {
|
||||
InitCache(cfg.CacheCfg()) // to make sure we start with correct config
|
||||
cfg.CacheCfg().AddTmpCaches()
|
||||
c = &CacheS{cfg: cfg, dm: dm,
|
||||
pcItems: make(map[string]chan struct{})}
|
||||
pcItems: make(map[string]chan struct{}),
|
||||
tCache: ltcache.NewTransCache(cfg.CacheCfg().AsTransCacheConfig())}
|
||||
for cacheID := range cfg.CacheCfg() {
|
||||
c.pcItems[cacheID] = make(chan struct{})
|
||||
}
|
||||
@@ -60,6 +56,58 @@ type CacheS struct {
|
||||
cfg *config.CGRConfig
|
||||
dm *DataManager
|
||||
pcItems map[string]chan struct{} // signal precaching
|
||||
tCache *ltcache.TransCache
|
||||
}
|
||||
|
||||
// Set is an exported method from TransCache
|
||||
func (chS *CacheS) Set(chID, itmID string, value interface{},
|
||||
groupIDs []string, commit bool, transID string) {
|
||||
chS.tCache.Set(chID, itmID, value, groupIDs, commit, transID)
|
||||
}
|
||||
|
||||
// HasItem is an exported method from TransCache
|
||||
func (chS *CacheS) HasItem(chID, itmID string) (has bool) {
|
||||
return chS.tCache.HasItem(chID, itmID)
|
||||
}
|
||||
|
||||
// Get is an exported method from TransCache
|
||||
func (chS *CacheS) Get(chID, itmID string) (interface{}, bool) {
|
||||
return chS.tCache.Get(chID, itmID)
|
||||
}
|
||||
|
||||
// GetItemIDs is an exported method from TransCache
|
||||
func (chS *CacheS) GetItemIDs(chID, prfx string) (itmIDs []string) {
|
||||
return chS.tCache.GetItemIDs(chID, prfx)
|
||||
}
|
||||
|
||||
// Remove is an exported method from TransCache
|
||||
func (chS *CacheS) Remove(chID, itmID string, commit bool, transID string) {
|
||||
chS.tCache.Remove(chID, itmID, commit, transID)
|
||||
}
|
||||
|
||||
// Clear is an exported method from TransCache
|
||||
func (chS *CacheS) Clear(chIDs []string) {
|
||||
chS.tCache.Clear(chIDs)
|
||||
}
|
||||
|
||||
// BeginTransaction is an exported method from TransCache
|
||||
func (chS *CacheS) BeginTransaction() string {
|
||||
return chS.tCache.BeginTransaction()
|
||||
}
|
||||
|
||||
// RollbackTransaction is an exported method from TransCache
|
||||
func (chS *CacheS) RollbackTransaction(transID string) {
|
||||
chS.tCache.RollbackTransaction(transID)
|
||||
}
|
||||
|
||||
// RollbackTransaction is an exported method from TransCache
|
||||
func (chS *CacheS) CommitTransaction(transID string) {
|
||||
chS.tCache.CommitTransaction(transID)
|
||||
}
|
||||
|
||||
// RollbackTransaction is an exported method from TransCache
|
||||
func (chS *CacheS) GetCloned(chID, itmID string) (cln interface{}, err error) {
|
||||
return chS.tCache.GetCloned(chID, itmID)
|
||||
}
|
||||
|
||||
// GetPrecacheChannel returns the channel used to signal precaching
|
||||
@@ -110,7 +158,7 @@ func (chS *CacheS) Call(serviceMethod string, args interface{}, reply interface{
|
||||
|
||||
func (chS *CacheS) V1GetItemIDs(args *utils.ArgsGetCacheItemIDsWithArgDispatcher,
|
||||
reply *[]string) (err error) {
|
||||
itmIDs := Cache.GetItemIDs(args.CacheID, args.ItemIDPrefix)
|
||||
itmIDs := chS.tCache.GetItemIDs(args.CacheID, args.ItemIDPrefix)
|
||||
if len(itmIDs) == 0 {
|
||||
return utils.ErrNotFound
|
||||
}
|
||||
@@ -120,13 +168,13 @@ func (chS *CacheS) V1GetItemIDs(args *utils.ArgsGetCacheItemIDsWithArgDispatcher
|
||||
|
||||
func (chS *CacheS) V1HasItem(args *utils.ArgsGetCacheItemWithArgDispatcher,
|
||||
reply *bool) (err error) {
|
||||
*reply = Cache.HasItem(args.CacheID, args.ItemID)
|
||||
*reply = chS.tCache.HasItem(args.CacheID, args.ItemID)
|
||||
return
|
||||
}
|
||||
|
||||
func (chS *CacheS) V1GetItemExpiryTime(args *utils.ArgsGetCacheItemWithArgDispatcher,
|
||||
reply *time.Time) (err error) {
|
||||
expTime, has := Cache.GetItemExpiryTime(args.CacheID, args.ItemID)
|
||||
expTime, has := chS.tCache.GetItemExpiryTime(args.CacheID, args.ItemID)
|
||||
if !has {
|
||||
return utils.ErrNotFound
|
||||
}
|
||||
@@ -136,21 +184,21 @@ func (chS *CacheS) V1GetItemExpiryTime(args *utils.ArgsGetCacheItemWithArgDispat
|
||||
|
||||
func (chS *CacheS) V1RemoveItem(args *utils.ArgsGetCacheItemWithArgDispatcher,
|
||||
reply *string) (err error) {
|
||||
Cache.Remove(args.CacheID, args.ItemID, true, utils.NonTransactional)
|
||||
chS.tCache.Remove(args.CacheID, args.ItemID, true, utils.NonTransactional)
|
||||
*reply = utils.OK
|
||||
return
|
||||
}
|
||||
|
||||
func (chS *CacheS) V1Clear(args *utils.AttrCacheIDsWithArgDispatcher,
|
||||
reply *string) (err error) {
|
||||
Cache.Clear(args.CacheIDs)
|
||||
chS.tCache.Clear(args.CacheIDs)
|
||||
*reply = utils.OK
|
||||
return
|
||||
}
|
||||
|
||||
func (chS *CacheS) V1GetCacheStats(args *utils.AttrCacheIDsWithArgDispatcher,
|
||||
rply *map[string]*ltcache.CacheStats) (err error) {
|
||||
cs := Cache.GetCacheStats(args.CacheIDs)
|
||||
cs := chS.tCache.GetCacheStats(args.CacheIDs)
|
||||
*rply = cs
|
||||
return
|
||||
}
|
||||
@@ -177,22 +225,22 @@ func (chS *CacheS) V1PrecacheStatus(args *utils.AttrCacheIDsWithArgDispatcher, r
|
||||
|
||||
func (chS *CacheS) V1HasGroup(args *utils.ArgsGetGroupWithArgDispatcher,
|
||||
rply *bool) (err error) {
|
||||
*rply = Cache.HasGroup(args.CacheID, args.GroupID)
|
||||
*rply = chS.tCache.HasGroup(args.CacheID, args.GroupID)
|
||||
return
|
||||
}
|
||||
|
||||
func (chS *CacheS) V1GetGroupItemIDs(args *utils.ArgsGetGroupWithArgDispatcher,
|
||||
rply *[]string) (err error) {
|
||||
if has := Cache.HasGroup(args.CacheID, args.GroupID); !has {
|
||||
if has := chS.tCache.HasGroup(args.CacheID, args.GroupID); !has {
|
||||
return utils.ErrNotFound
|
||||
}
|
||||
*rply = Cache.GetGroupItemIDs(args.CacheID, args.GroupID)
|
||||
*rply = chS.tCache.GetGroupItemIDs(args.CacheID, args.GroupID)
|
||||
return
|
||||
}
|
||||
|
||||
func (chS *CacheS) V1RemoveGroup(args *utils.ArgsGetGroupWithArgDispatcher,
|
||||
rply *string) (err error) {
|
||||
Cache.RemoveGroup(args.CacheID, args.GroupID, true, utils.NonTransactional)
|
||||
chS.tCache.RemoveGroup(args.CacheID, args.GroupID, true, utils.NonTransactional)
|
||||
*rply = utils.OK
|
||||
return
|
||||
}
|
||||
@@ -206,7 +254,7 @@ func (chS *CacheS) reloadCache(chID string, IDs *[]string) error {
|
||||
|
||||
func (chS *CacheS) V1ReloadCache(attrs utils.AttrReloadCacheWithArgDispatcher, reply *string) (err error) {
|
||||
if attrs.FlushAll {
|
||||
Cache.Clear(nil)
|
||||
chS.tCache.Clear(nil)
|
||||
return
|
||||
}
|
||||
// Reload Destinations
|
||||
@@ -305,7 +353,7 @@ func (chS *CacheS) V1ReloadCache(attrs utils.AttrReloadCacheWithArgDispatcher, r
|
||||
}
|
||||
cacheLoadIDs := populateCacheLoadIDs(loadIDs, attrs.AttrReloadCache)
|
||||
for key, val := range cacheLoadIDs {
|
||||
Cache.Set(utils.CacheLoadIDs, key, val, nil,
|
||||
chS.tCache.Set(utils.CacheLoadIDs, key, val, nil,
|
||||
cacheCommit(utils.NonTransactional), utils.NonTransactional)
|
||||
}
|
||||
|
||||
@@ -322,7 +370,7 @@ func toStringSlice(in *[]string) []string {
|
||||
|
||||
func (chS *CacheS) V1LoadCache(args utils.AttrReloadCacheWithArgDispatcher, reply *string) (err error) {
|
||||
if args.FlushAll {
|
||||
Cache.Clear(nil)
|
||||
chS.tCache.Clear(nil)
|
||||
}
|
||||
if err := chS.dm.LoadDataDBCache(
|
||||
toStringSlice(args.DestinationIDs),
|
||||
@@ -360,51 +408,51 @@ func (chS *CacheS) V1LoadCache(args utils.AttrReloadCacheWithArgDispatcher, repl
|
||||
}
|
||||
cacheLoadIDs := populateCacheLoadIDs(loadIDs, args.AttrReloadCache)
|
||||
for key, val := range cacheLoadIDs {
|
||||
Cache.Set(utils.CacheLoadIDs, key, val, nil,
|
||||
chS.tCache.Set(utils.CacheLoadIDs, key, val, nil,
|
||||
cacheCommit(utils.NonTransactional), utils.NonTransactional)
|
||||
}
|
||||
*reply = utils.OK
|
||||
return nil
|
||||
}
|
||||
|
||||
func flushCache(chID string, IDs *[]string) {
|
||||
func (chS *CacheS) flushCache(chID string, IDs *[]string) {
|
||||
if IDs == nil {
|
||||
Cache.Clear([]string{chID})
|
||||
chS.tCache.Clear([]string{chID})
|
||||
return
|
||||
}
|
||||
for _, key := range *IDs {
|
||||
Cache.Remove(chID, key, true, utils.NonTransactional)
|
||||
chS.tCache.Remove(chID, key, true, utils.NonTransactional)
|
||||
}
|
||||
}
|
||||
|
||||
// V1FlushCache wipes out cache for a prefix or completely
|
||||
func (chS *CacheS) V1FlushCache(args utils.AttrReloadCacheWithArgDispatcher, reply *string) (err error) {
|
||||
if args.FlushAll {
|
||||
Cache.Clear(nil)
|
||||
chS.tCache.Clear(nil)
|
||||
*reply = utils.OK
|
||||
return
|
||||
}
|
||||
flushCache(utils.CacheDestinations, args.DestinationIDs)
|
||||
flushCache(utils.CacheReverseDestinations, args.ReverseDestinationIDs)
|
||||
flushCache(utils.CacheRatingPlans, args.RatingPlanIDs)
|
||||
flushCache(utils.CacheRatingProfiles, args.RatingProfileIDs)
|
||||
flushCache(utils.CacheActions, args.ActionIDs)
|
||||
flushCache(utils.CacheActionPlans, args.ActionPlanIDs)
|
||||
flushCache(utils.CacheActionTriggers, args.ActionTriggerIDs)
|
||||
flushCache(utils.CacheSharedGroups, args.SharedGroupIDs)
|
||||
flushCache(utils.CacheResourceProfiles, args.ResourceProfileIDs)
|
||||
flushCache(utils.CacheResources, args.ResourceIDs)
|
||||
flushCache(utils.CacheStatQueues, args.StatsQueueIDs)
|
||||
flushCache(utils.CacheThresholdProfiles, args.StatsQueueProfileIDs)
|
||||
flushCache(utils.CacheThresholds, args.ThresholdIDs)
|
||||
flushCache(utils.CacheThresholdProfiles, args.ThresholdProfileIDs)
|
||||
flushCache(utils.CacheFilters, args.FilterIDs)
|
||||
flushCache(utils.CacheSupplierProfiles, args.SupplierProfileIDs)
|
||||
flushCache(utils.CacheAttributeProfiles, args.AttributeProfileIDs)
|
||||
flushCache(utils.CacheChargerProfiles, args.ChargerProfileIDs)
|
||||
flushCache(utils.CacheDispatcherProfiles, args.DispatcherProfileIDs)
|
||||
flushCache(utils.CacheDispatcherHosts, args.DispatcherHostIDs)
|
||||
flushCache(utils.CacheDispatcherRoutes, args.DispatcherRoutesIDs)
|
||||
chS.flushCache(utils.CacheDestinations, args.DestinationIDs)
|
||||
chS.flushCache(utils.CacheReverseDestinations, args.ReverseDestinationIDs)
|
||||
chS.flushCache(utils.CacheRatingPlans, args.RatingPlanIDs)
|
||||
chS.flushCache(utils.CacheRatingProfiles, args.RatingProfileIDs)
|
||||
chS.flushCache(utils.CacheActions, args.ActionIDs)
|
||||
chS.flushCache(utils.CacheActionPlans, args.ActionPlanIDs)
|
||||
chS.flushCache(utils.CacheActionTriggers, args.ActionTriggerIDs)
|
||||
chS.flushCache(utils.CacheSharedGroups, args.SharedGroupIDs)
|
||||
chS.flushCache(utils.CacheResourceProfiles, args.ResourceProfileIDs)
|
||||
chS.flushCache(utils.CacheResources, args.ResourceIDs)
|
||||
chS.flushCache(utils.CacheStatQueues, args.StatsQueueIDs)
|
||||
chS.flushCache(utils.CacheThresholdProfiles, args.StatsQueueProfileIDs)
|
||||
chS.flushCache(utils.CacheThresholds, args.ThresholdIDs)
|
||||
chS.flushCache(utils.CacheThresholdProfiles, args.ThresholdProfileIDs)
|
||||
chS.flushCache(utils.CacheFilters, args.FilterIDs)
|
||||
chS.flushCache(utils.CacheSupplierProfiles, args.SupplierProfileIDs)
|
||||
chS.flushCache(utils.CacheAttributeProfiles, args.AttributeProfileIDs)
|
||||
chS.flushCache(utils.CacheChargerProfiles, args.ChargerProfileIDs)
|
||||
chS.flushCache(utils.CacheDispatcherProfiles, args.DispatcherProfileIDs)
|
||||
chS.flushCache(utils.CacheDispatcherHosts, args.DispatcherHostIDs)
|
||||
chS.flushCache(utils.CacheDispatcherRoutes, args.DispatcherRoutesIDs)
|
||||
//get loadIDs for all types
|
||||
loadIDs, err := chS.dm.GetItemLoadIDs(utils.EmptyString, false)
|
||||
if err != nil {
|
||||
@@ -412,7 +460,7 @@ func (chS *CacheS) V1FlushCache(args utils.AttrReloadCacheWithArgDispatcher, rep
|
||||
}
|
||||
cacheLoadIDs := populateCacheLoadIDs(loadIDs, args.AttrReloadCache)
|
||||
for key, val := range cacheLoadIDs {
|
||||
Cache.Set(utils.CacheLoadIDs, key, val, nil,
|
||||
chS.tCache.Set(utils.CacheLoadIDs, key, val, nil,
|
||||
cacheCommit(utils.NonTransactional), utils.NonTransactional)
|
||||
}
|
||||
*reply = utils.OK
|
||||
|
||||
@@ -621,5 +621,9 @@ func GetDefaultEmptyCacheStats() map[string]*ltcache.CacheStats {
|
||||
Items: 0,
|
||||
Groups: 0,
|
||||
},
|
||||
utils.CacheUCH: {
|
||||
Items: 0,
|
||||
Groups: 0,
|
||||
},
|
||||
}
|
||||
}
|
||||
|
||||
Reference in New Issue
Block a user