diff --git a/agents/agentreq_test.go b/agents/agentreq_test.go index da7bd10a1..2e81e819b 100644 --- a/agents/agentreq_test.go +++ b/agents/agentreq_test.go @@ -1744,7 +1744,7 @@ func TestAgReqSetFieldsInCache(t *testing.T) { data := engine.NewInternalDB(nil, nil, true, cfg.DataDbCfg().Items) dm := engine.NewDataManager(data, config.CgrConfig().CacheCfg(), nil) filterS := engine.NewFilterS(cfg, nil, dm) - engine.InitCache(cfg.CacheCfg()) + engine.NewCacheS(cfg, dm) agReq := NewAgentRequest(nil, nil, nil, nil, nil, "cgrates.org", "", filterS, nil, nil) agReq.CGRRequest.Set([]string{utils.Account}, "1001", false, false) @@ -1785,7 +1785,7 @@ func TestAgReqSetFieldsInCacheWithTimeOut(t *testing.T) { filterS := engine.NewFilterS(cfg, nil, dm) cfg.CacheCfg()[utils.CacheUCH].TTL = 1 * time.Second - engine.InitCache(cfg.CacheCfg()) + engine.Cache = engine.NewCacheS(cfg, dm) agReq := NewAgentRequest(nil, nil, nil, nil, nil, "cgrates.org", "", filterS, nil, nil) agReq.CGRRequest.Set([]string{utils.Account}, "1001", false, false) diff --git a/agents/diamagent.go b/agents/diamagent.go index 63873526b..00a76e9c3 100644 --- a/agents/diamagent.go +++ b/agents/diamagent.go @@ -522,7 +522,7 @@ func (da *DiameterAgent) sendASR(originID string, reply *string) (err error) { return } -// V1ReAuthorize sends a rar meseage to diameter client +// V1ReAuthorize sends a rar message to diameter client func (da *DiameterAgent) V1ReAuthorize(originID string, reply *string) (err error) { if originID == "" { utils.Logger.Info( diff --git a/apier/v1/apier_it_test.go b/apier/v1/apier_it_test.go index eb4a31760..0fe209d37 100644 --- a/apier/v1/apier_it_test.go +++ b/apier/v1/apier_it_test.go @@ -808,7 +808,7 @@ func testApierLoadAccountActions(t *testing.T) { if err := rater.Call(utils.CacheSv1GetCacheStats, new(utils.AttrCacheIDsWithArgDispatcher), &rcvStats); err != nil { t.Error("Got error on CacheSv1.GetCacheStats: ", err.Error()) } else if !reflect.DeepEqual(expectedStats, rcvStats) { - t.Errorf("Calling CacheSv1.GetCacheStats expected: %+v, received: %+v", utils.ToJSON(expectedStats), utils.ToJSON(rcvStats)) + t.Errorf("Calling CacheSv1.GetCacheStats expected: %+v,\n received: %+v", utils.ToJSON(expectedStats), utils.ToJSON(rcvStats)) } var reply string aa1 := &utils.TPAccountActions{TPid: utils.TEST_SQL, LoadId: utils.TEST_SQL, Tenant: "cgrates.org", Account: "1001"} @@ -824,7 +824,7 @@ func testApierLoadAccountActions(t *testing.T) { if err := rater.Call(utils.CacheSv1GetCacheStats, new(utils.AttrCacheIDsWithArgDispatcher), &rcvStats); err != nil { t.Error("Got error on CacheSv1.GetCacheStats: ", err.Error()) } else if !reflect.DeepEqual(expectedStats, rcvStats) { - t.Errorf("Calling CacheSv1.GetCacheStats expected: %+v, received: %+v", utils.ToJSON(expectedStats), utils.ToJSON(rcvStats)) + t.Errorf("Calling CacheSv1.GetCacheStats expected: %+v, \n received: %+v", utils.ToJSON(expectedStats), utils.ToJSON(rcvStats)) } } diff --git a/apier/v1/precache_it_test.go b/apier/v1/precache_it_test.go index 7a14b7e8f..5026fc658 100644 --- a/apier/v1/precache_it_test.go +++ b/apier/v1/precache_it_test.go @@ -310,6 +310,10 @@ func testPrecacheGetCacheStatsAfterRestart(t *testing.T) { Items: 0, Groups: 0, }, + utils.CacheUCH: { + Items: 0, + Groups: 0, + }, } if err := precacheRPC.Call(utils.CacheSv1GetCacheStats, args, &reply); err != nil { t.Error(err.Error()) diff --git a/cmd/cgr-engine/cgr-engine.go b/cmd/cgr-engine/cgr-engine.go index 05a209825..35864e4c4 100644 --- a/cmd/cgr-engine/cgr-engine.go +++ b/cmd/cgr-engine/cgr-engine.go @@ -462,6 +462,7 @@ func main() { // init CacheS cacheS := initCacheS(internalCacheSChan, server, dmService.GetDM(), exitChan) + engine.SetCache(cacheS) // init GuardianSv1 initGuardianSv1(internalGuardianSChan, server) diff --git a/dispatchers/caches_it_test.go b/dispatchers/caches_it_test.go index 743ce6bf7..f491ea5b3 100644 --- a/dispatchers/caches_it_test.go +++ b/dispatchers/caches_it_test.go @@ -194,6 +194,7 @@ func testDspChcPrecacheStatus(t *testing.T) { utils.CacheRPCConnections: utils.MetaReady, utils.CacheRPCResponses: utils.MetaReady, utils.CacheRatingProfilesTmp: utils.MetaReady, + utils.CacheUCH: utils.MetaReady, } if err := dispEngine.RPC.Call(utils.CacheSv1PrecacheStatus, utils.AttrCacheIDsWithArgDispatcher{ diff --git a/engine/caches.go b/engine/caches.go index 3a3a52e81..cd00c5900 100644 --- a/engine/caches.go +++ b/engine/caches.go @@ -28,27 +28,23 @@ import ( "github.com/cgrates/ltcache" ) -// Cache is the global cache used -var Cache *ltcache.TransCache +var Cache *CacheS func init() { - InitCache(nil) + Cache = NewCacheS(config.CgrConfig(), nil) } -// InitCache will instantiate the cache with specific or default configuraiton -func InitCache(cfg config.CacheCfg) { - if cfg == nil { - cfg = config.CgrConfig().CacheCfg() - } - cfg.AddTmpCaches() - Cache = ltcache.NewTransCache(cfg.AsTransCacheConfig()) +//SetCache shared the cache from other subsystems +func SetCache(chS *CacheS) { + Cache = chS } // NewCacheS initializes the Cache service and executes the precaching func NewCacheS(cfg *config.CGRConfig, dm *DataManager) (c *CacheS) { - InitCache(cfg.CacheCfg()) // to make sure we start with correct config + cfg.CacheCfg().AddTmpCaches() c = &CacheS{cfg: cfg, dm: dm, - pcItems: make(map[string]chan struct{})} + pcItems: make(map[string]chan struct{}), + tCache: ltcache.NewTransCache(cfg.CacheCfg().AsTransCacheConfig())} for cacheID := range cfg.CacheCfg() { c.pcItems[cacheID] = make(chan struct{}) } @@ -60,6 +56,58 @@ type CacheS struct { cfg *config.CGRConfig dm *DataManager pcItems map[string]chan struct{} // signal precaching + tCache *ltcache.TransCache +} + +// Set is an exported method from TransCache +func (chS *CacheS) Set(chID, itmID string, value interface{}, + groupIDs []string, commit bool, transID string) { + chS.tCache.Set(chID, itmID, value, groupIDs, commit, transID) +} + +// HasItem is an exported method from TransCache +func (chS *CacheS) HasItem(chID, itmID string) (has bool) { + return chS.tCache.HasItem(chID, itmID) +} + +// Get is an exported method from TransCache +func (chS *CacheS) Get(chID, itmID string) (interface{}, bool) { + return chS.tCache.Get(chID, itmID) +} + +// GetItemIDs is an exported method from TransCache +func (chS *CacheS) GetItemIDs(chID, prfx string) (itmIDs []string) { + return chS.tCache.GetItemIDs(chID, prfx) +} + +// Remove is an exported method from TransCache +func (chS *CacheS) Remove(chID, itmID string, commit bool, transID string) { + chS.tCache.Remove(chID, itmID, commit, transID) +} + +// Clear is an exported method from TransCache +func (chS *CacheS) Clear(chIDs []string) { + chS.tCache.Clear(chIDs) +} + +// BeginTransaction is an exported method from TransCache +func (chS *CacheS) BeginTransaction() string { + return chS.tCache.BeginTransaction() +} + +// RollbackTransaction is an exported method from TransCache +func (chS *CacheS) RollbackTransaction(transID string) { + chS.tCache.RollbackTransaction(transID) +} + +// RollbackTransaction is an exported method from TransCache +func (chS *CacheS) CommitTransaction(transID string) { + chS.tCache.CommitTransaction(transID) +} + +// RollbackTransaction is an exported method from TransCache +func (chS *CacheS) GetCloned(chID, itmID string) (cln interface{}, err error) { + return chS.tCache.GetCloned(chID, itmID) } // GetPrecacheChannel returns the channel used to signal precaching @@ -110,7 +158,7 @@ func (chS *CacheS) Call(serviceMethod string, args interface{}, reply interface{ func (chS *CacheS) V1GetItemIDs(args *utils.ArgsGetCacheItemIDsWithArgDispatcher, reply *[]string) (err error) { - itmIDs := Cache.GetItemIDs(args.CacheID, args.ItemIDPrefix) + itmIDs := chS.tCache.GetItemIDs(args.CacheID, args.ItemIDPrefix) if len(itmIDs) == 0 { return utils.ErrNotFound } @@ -120,13 +168,13 @@ func (chS *CacheS) V1GetItemIDs(args *utils.ArgsGetCacheItemIDsWithArgDispatcher func (chS *CacheS) V1HasItem(args *utils.ArgsGetCacheItemWithArgDispatcher, reply *bool) (err error) { - *reply = Cache.HasItem(args.CacheID, args.ItemID) + *reply = chS.tCache.HasItem(args.CacheID, args.ItemID) return } func (chS *CacheS) V1GetItemExpiryTime(args *utils.ArgsGetCacheItemWithArgDispatcher, reply *time.Time) (err error) { - expTime, has := Cache.GetItemExpiryTime(args.CacheID, args.ItemID) + expTime, has := chS.tCache.GetItemExpiryTime(args.CacheID, args.ItemID) if !has { return utils.ErrNotFound } @@ -136,21 +184,21 @@ func (chS *CacheS) V1GetItemExpiryTime(args *utils.ArgsGetCacheItemWithArgDispat func (chS *CacheS) V1RemoveItem(args *utils.ArgsGetCacheItemWithArgDispatcher, reply *string) (err error) { - Cache.Remove(args.CacheID, args.ItemID, true, utils.NonTransactional) + chS.tCache.Remove(args.CacheID, args.ItemID, true, utils.NonTransactional) *reply = utils.OK return } func (chS *CacheS) V1Clear(args *utils.AttrCacheIDsWithArgDispatcher, reply *string) (err error) { - Cache.Clear(args.CacheIDs) + chS.tCache.Clear(args.CacheIDs) *reply = utils.OK return } func (chS *CacheS) V1GetCacheStats(args *utils.AttrCacheIDsWithArgDispatcher, rply *map[string]*ltcache.CacheStats) (err error) { - cs := Cache.GetCacheStats(args.CacheIDs) + cs := chS.tCache.GetCacheStats(args.CacheIDs) *rply = cs return } @@ -177,22 +225,22 @@ func (chS *CacheS) V1PrecacheStatus(args *utils.AttrCacheIDsWithArgDispatcher, r func (chS *CacheS) V1HasGroup(args *utils.ArgsGetGroupWithArgDispatcher, rply *bool) (err error) { - *rply = Cache.HasGroup(args.CacheID, args.GroupID) + *rply = chS.tCache.HasGroup(args.CacheID, args.GroupID) return } func (chS *CacheS) V1GetGroupItemIDs(args *utils.ArgsGetGroupWithArgDispatcher, rply *[]string) (err error) { - if has := Cache.HasGroup(args.CacheID, args.GroupID); !has { + if has := chS.tCache.HasGroup(args.CacheID, args.GroupID); !has { return utils.ErrNotFound } - *rply = Cache.GetGroupItemIDs(args.CacheID, args.GroupID) + *rply = chS.tCache.GetGroupItemIDs(args.CacheID, args.GroupID) return } func (chS *CacheS) V1RemoveGroup(args *utils.ArgsGetGroupWithArgDispatcher, rply *string) (err error) { - Cache.RemoveGroup(args.CacheID, args.GroupID, true, utils.NonTransactional) + chS.tCache.RemoveGroup(args.CacheID, args.GroupID, true, utils.NonTransactional) *rply = utils.OK return } @@ -206,7 +254,7 @@ func (chS *CacheS) reloadCache(chID string, IDs *[]string) error { func (chS *CacheS) V1ReloadCache(attrs utils.AttrReloadCacheWithArgDispatcher, reply *string) (err error) { if attrs.FlushAll { - Cache.Clear(nil) + chS.tCache.Clear(nil) return } // Reload Destinations @@ -305,7 +353,7 @@ func (chS *CacheS) V1ReloadCache(attrs utils.AttrReloadCacheWithArgDispatcher, r } cacheLoadIDs := populateCacheLoadIDs(loadIDs, attrs.AttrReloadCache) for key, val := range cacheLoadIDs { - Cache.Set(utils.CacheLoadIDs, key, val, nil, + chS.tCache.Set(utils.CacheLoadIDs, key, val, nil, cacheCommit(utils.NonTransactional), utils.NonTransactional) } @@ -322,7 +370,7 @@ func toStringSlice(in *[]string) []string { func (chS *CacheS) V1LoadCache(args utils.AttrReloadCacheWithArgDispatcher, reply *string) (err error) { if args.FlushAll { - Cache.Clear(nil) + chS.tCache.Clear(nil) } if err := chS.dm.LoadDataDBCache( toStringSlice(args.DestinationIDs), @@ -360,51 +408,51 @@ func (chS *CacheS) V1LoadCache(args utils.AttrReloadCacheWithArgDispatcher, repl } cacheLoadIDs := populateCacheLoadIDs(loadIDs, args.AttrReloadCache) for key, val := range cacheLoadIDs { - Cache.Set(utils.CacheLoadIDs, key, val, nil, + chS.tCache.Set(utils.CacheLoadIDs, key, val, nil, cacheCommit(utils.NonTransactional), utils.NonTransactional) } *reply = utils.OK return nil } -func flushCache(chID string, IDs *[]string) { +func (chS *CacheS) flushCache(chID string, IDs *[]string) { if IDs == nil { - Cache.Clear([]string{chID}) + chS.tCache.Clear([]string{chID}) return } for _, key := range *IDs { - Cache.Remove(chID, key, true, utils.NonTransactional) + chS.tCache.Remove(chID, key, true, utils.NonTransactional) } } // V1FlushCache wipes out cache for a prefix or completely func (chS *CacheS) V1FlushCache(args utils.AttrReloadCacheWithArgDispatcher, reply *string) (err error) { if args.FlushAll { - Cache.Clear(nil) + chS.tCache.Clear(nil) *reply = utils.OK return } - flushCache(utils.CacheDestinations, args.DestinationIDs) - flushCache(utils.CacheReverseDestinations, args.ReverseDestinationIDs) - flushCache(utils.CacheRatingPlans, args.RatingPlanIDs) - flushCache(utils.CacheRatingProfiles, args.RatingProfileIDs) - flushCache(utils.CacheActions, args.ActionIDs) - flushCache(utils.CacheActionPlans, args.ActionPlanIDs) - flushCache(utils.CacheActionTriggers, args.ActionTriggerIDs) - flushCache(utils.CacheSharedGroups, args.SharedGroupIDs) - flushCache(utils.CacheResourceProfiles, args.ResourceProfileIDs) - flushCache(utils.CacheResources, args.ResourceIDs) - flushCache(utils.CacheStatQueues, args.StatsQueueIDs) - flushCache(utils.CacheThresholdProfiles, args.StatsQueueProfileIDs) - flushCache(utils.CacheThresholds, args.ThresholdIDs) - flushCache(utils.CacheThresholdProfiles, args.ThresholdProfileIDs) - flushCache(utils.CacheFilters, args.FilterIDs) - flushCache(utils.CacheSupplierProfiles, args.SupplierProfileIDs) - flushCache(utils.CacheAttributeProfiles, args.AttributeProfileIDs) - flushCache(utils.CacheChargerProfiles, args.ChargerProfileIDs) - flushCache(utils.CacheDispatcherProfiles, args.DispatcherProfileIDs) - flushCache(utils.CacheDispatcherHosts, args.DispatcherHostIDs) - flushCache(utils.CacheDispatcherRoutes, args.DispatcherRoutesIDs) + chS.flushCache(utils.CacheDestinations, args.DestinationIDs) + chS.flushCache(utils.CacheReverseDestinations, args.ReverseDestinationIDs) + chS.flushCache(utils.CacheRatingPlans, args.RatingPlanIDs) + chS.flushCache(utils.CacheRatingProfiles, args.RatingProfileIDs) + chS.flushCache(utils.CacheActions, args.ActionIDs) + chS.flushCache(utils.CacheActionPlans, args.ActionPlanIDs) + chS.flushCache(utils.CacheActionTriggers, args.ActionTriggerIDs) + chS.flushCache(utils.CacheSharedGroups, args.SharedGroupIDs) + chS.flushCache(utils.CacheResourceProfiles, args.ResourceProfileIDs) + chS.flushCache(utils.CacheResources, args.ResourceIDs) + chS.flushCache(utils.CacheStatQueues, args.StatsQueueIDs) + chS.flushCache(utils.CacheThresholdProfiles, args.StatsQueueProfileIDs) + chS.flushCache(utils.CacheThresholds, args.ThresholdIDs) + chS.flushCache(utils.CacheThresholdProfiles, args.ThresholdProfileIDs) + chS.flushCache(utils.CacheFilters, args.FilterIDs) + chS.flushCache(utils.CacheSupplierProfiles, args.SupplierProfileIDs) + chS.flushCache(utils.CacheAttributeProfiles, args.AttributeProfileIDs) + chS.flushCache(utils.CacheChargerProfiles, args.ChargerProfileIDs) + chS.flushCache(utils.CacheDispatcherProfiles, args.DispatcherProfileIDs) + chS.flushCache(utils.CacheDispatcherHosts, args.DispatcherHostIDs) + chS.flushCache(utils.CacheDispatcherRoutes, args.DispatcherRoutesIDs) //get loadIDs for all types loadIDs, err := chS.dm.GetItemLoadIDs(utils.EmptyString, false) if err != nil { @@ -412,7 +460,7 @@ func (chS *CacheS) V1FlushCache(args utils.AttrReloadCacheWithArgDispatcher, rep } cacheLoadIDs := populateCacheLoadIDs(loadIDs, args.AttrReloadCache) for key, val := range cacheLoadIDs { - Cache.Set(utils.CacheLoadIDs, key, val, nil, + chS.tCache.Set(utils.CacheLoadIDs, key, val, nil, cacheCommit(utils.NonTransactional), utils.NonTransactional) } *reply = utils.OK diff --git a/engine/libtest.go b/engine/libtest.go index 3496aabdb..8bea4999d 100644 --- a/engine/libtest.go +++ b/engine/libtest.go @@ -621,5 +621,9 @@ func GetDefaultEmptyCacheStats() map[string]*ltcache.CacheStats { Items: 0, Groups: 0, }, + utils.CacheUCH: { + Items: 0, + Groups: 0, + }, } }