From 81b03d9ce7e884e0356734e094e501217f1f3324 Mon Sep 17 00:00:00 2001 From: DanB Date: Sun, 14 Apr 2019 20:27:43 +0200 Subject: [PATCH] Simplified caching in case of ResourceS --- apier/v1/caches_it_test.go | 1 - apier/v1/resourcesv1.go | 6 +- dispatchers/caches_it_test.go | 1 - engine/caches.go | 1 - engine/resources.go | 421 ++++++++++++++++++---------------- engine/resources_test.go | 36 ++- utils/apitpdata.go | 3 +- utils/consts.go | 4 +- 8 files changed, 253 insertions(+), 220 deletions(-) diff --git a/apier/v1/caches_it_test.go b/apier/v1/caches_it_test.go index 2d99fa423..d29399a0c 100644 --- a/apier/v1/caches_it_test.go +++ b/apier/v1/caches_it_test.go @@ -353,7 +353,6 @@ func testCacheSPrecacheStatus(t *testing.T) { utils.CacheSharedGroups: utils.MetaReady, utils.CacheResourceProfiles: utils.MetaReady, utils.CacheResources: utils.MetaReady, - utils.CacheEventResources: utils.MetaReady, utils.CacheTimings: utils.MetaReady, utils.CacheStatQueueProfiles: utils.MetaReady, utils.CacheStatQueues: utils.MetaReady, diff --git a/apier/v1/resourcesv1.go b/apier/v1/resourcesv1.go index 8821df8d0..3eef8142a 100644 --- a/apier/v1/resourcesv1.go +++ b/apier/v1/resourcesv1.go @@ -108,7 +108,7 @@ type ResourceWithCache struct { Cache *string } -//SetResourceProfile add a new resource configuration +//SetResourceProfile adds a new resource configuration func (apierV1 *ApierV1) SetResourceProfile(arg *ResourceWithCache, reply *string) error { if missing := utils.MissingStructFields(arg.ResourceProfile, []string{"Tenant", "ID"}); len(missing) != 0 { return utils.NewErrMandatoryIeMissing(missing...) @@ -119,7 +119,9 @@ func (apierV1 *ApierV1) SetResourceProfile(arg *ResourceWithCache, reply *string //generate a loadID for CacheResourceProfiles and CacheResources and store it in database //make 1 insert for both ResourceProfile and Resources instead of 2 loadID := time.Now().UnixNano() - if err := apierV1.DataManager.SetLoadIDs(map[string]int64{utils.CacheResourceProfiles: loadID, utils.CacheResources: loadID}); err != nil { + if err := apierV1.DataManager.SetLoadIDs( + map[string]int64{utils.CacheResourceProfiles: loadID, + utils.CacheResources: loadID}); err != nil { return utils.APIErrorHandler(err) } //handle caching for ResourceProfile diff --git a/dispatchers/caches_it_test.go b/dispatchers/caches_it_test.go index c78d61c0a..969c22373 100644 --- a/dispatchers/caches_it_test.go +++ b/dispatchers/caches_it_test.go @@ -147,7 +147,6 @@ func testDspChcPrecacheStatus(t *testing.T) { utils.CacheSharedGroups: utils.MetaReady, utils.CacheResourceProfiles: utils.MetaReady, utils.CacheResources: utils.MetaReady, - utils.CacheEventResources: utils.MetaReady, utils.CacheTimings: utils.MetaReady, utils.CacheStatQueueProfiles: utils.MetaReady, utils.CacheStatQueues: utils.MetaReady, diff --git a/engine/caches.go b/engine/caches.go index 3b61361e1..d31a9606d 100644 --- a/engine/caches.go +++ b/engine/caches.go @@ -66,7 +66,6 @@ var precachedPartitions = utils.StringMap{ utils.CacheDispatcherFilterIndexes: true, utils.CacheDiameterMessages: true, - utils.CacheEventResources: true, utils.CacheTimings: true, utils.CacheLoadIDs: true, } diff --git a/engine/resources.go b/engine/resources.go index 7b7d88dbb..18debe2d0 100644 --- a/engine/resources.go +++ b/engine/resources.go @@ -223,16 +223,16 @@ func (rs Resources) clearUsage(ruTntID string) (err error) { return } -// tenantIDs returns list of TenantIDs in resources -func (rs Resources) tenantIDs() []*utils.TenantID { - tntIDs := make([]*utils.TenantID, len(rs)) - for i, r := range rs { - tntIDs[i] = &utils.TenantID{r.Tenant, r.ID} +// resIDsMp returns a map of resource IDs which is used for caching +func (rs Resources) resIDsMp() (mp utils.StringMap) { + mp = make(utils.StringMap) + for _, r := range rs { + mp[r.ID] = true } - return tntIDs + return mp } -func (rs Resources) tenatIDsStr() []string { +func (rs Resources) tenatIDs() []string { ids := make([]string, len(rs)) for i, r := range rs { ids[i] = r.TenantID() @@ -240,6 +240,14 @@ func (rs Resources) tenatIDsStr() []string { return ids } +func (rs Resources) IDs() []string { + ids := make([]string, len(rs)) + for i, r := range rs { + ids[i] = r.ID + } + return ids +} + // allocateResource attempts allocating resources for a *ResourceUsage // simulates on dryRun // returns utils.ErrResourceUnavailable if allocation is not possible @@ -247,7 +255,7 @@ func (rs Resources) allocateResource(ru *ResourceUsage, dryRun bool) (alcMessage if len(rs) == 0 { return "", utils.ErrResourceUnavailable } - lockIDs := utils.PrefixSliceItems(rs.tenatIDsStr(), utils.ResourcesPrefix) + lockIDs := utils.PrefixSliceItems(rs.tenatIDs(), utils.ResourcesPrefix) guardian.Guardian.Guard(func() (gRes interface{}, gErr error) { // Simulate resource usage for _, r := range rs { @@ -290,7 +298,6 @@ func NewResourceService(dm *DataManager, storeInterval time.Duration, thdS = nil } return &ResourceService{dm: dm, thdS: thdS, - lcEventResources: make(map[string][]*utils.TenantID), storedResources: make(utils.StringMap), storeInterval: storeInterval, filterS: filterS, @@ -306,12 +313,10 @@ type ResourceService struct { filterS *FilterS stringIndexedFields *[]string // speed up query on indexes prefixIndexedFields *[]string - lcEventResources map[string][]*utils.TenantID // cache recording resources for events in alocation phase - lcERMux sync.RWMutex // protects the lcEventResources - storedResources utils.StringMap // keep a record of resources which need saving, map[resID]bool - srMux sync.RWMutex // protects storedResources - storeInterval time.Duration // interval to dump data on - stopBackup chan struct{} // control storing process + storedResources utils.StringMap // keep a record of resources which need saving, map[resID]bool + srMux sync.RWMutex // protects storedResources + storeInterval time.Duration // interval to dump data on + stopBackup chan struct{} // control storing process } // Called to start the service @@ -342,7 +347,7 @@ func (rS *ResourceService) StoreResource(r *Resource) (err error) { r.ID, err.Error())) return } - //since we no longer handle cache in DataManager do here a manul caching + //since we no longer handle cache in DataManager do here a manual caching if err = rS.dm.CacheDataFromDB(utils.ResourcesPrefix, []string{r.TenantID()}, true); err != nil { utils.Logger.Warning( fmt.Sprintf(" failed caching Resource with ID: %s, error: %s", @@ -399,114 +404,6 @@ func (rS *ResourceService) runBackup() { } } -// cachedResourcesForEvent attempts to retrieve cached resources for an event -// returns nil if event not cached or errors occur -// returns []Resource if negative reply was cached -func (rS *ResourceService) cachedResourcesForEvent(evUUID string) (rs Resources) { - var shortCached bool - rS.lcERMux.RLock() - rIDs, has := rS.lcEventResources[evUUID] - rS.lcERMux.RUnlock() - if !has { - if rIDsIf, has := Cache.Get(utils.CacheEventResources, evUUID); !has { - return nil - } else if rIDsIf != nil { - rIDs = rIDsIf.([]*utils.TenantID) - } - shortCached = true - } - rs = make(Resources, len(rIDs)) - if len(rIDs) == 0 { - return - } - lockIDs := make([]string, len(rIDs)) - for i, rTid := range rIDs { - lockIDs[i] = utils.ResourcesPrefix + rTid.TenantID() - } - guardian.Guardian.Guard(func() (gRes interface{}, gErr error) { - for i, rTid := range rIDs { - if r, err := rS.dm.GetResource(rTid.Tenant, rTid.ID, true, true, ""); err != nil { - utils.Logger.Warning( - fmt.Sprintf(" force-uncaching resources for evUUID: <%s>, error: <%s>", - evUUID, err.Error())) - // on errors, cleanup cache so we recache - if shortCached { - Cache.Remove(utils.CacheEventResources, evUUID, true, "") - } else { - rS.lcERMux.Lock() - delete(rS.lcEventResources, evUUID) - rS.lcERMux.Unlock() - } - return - } else { - rs[i] = r - } - } - return - }, config.CgrConfig().GeneralCfg().LockingTimeout, lockIDs...) - return -} - -// matchingResourcesForEvent returns ordered list of matching resources which are active by the time of the call -func (rS *ResourceService) matchingResourcesForEvent(ev *utils.CGREvent, usageTTL *time.Duration) (rs Resources, err error) { - matchingResources := make(map[string]*Resource) - rIDs, err := MatchingItemIDsForEvent(ev.Event, rS.stringIndexedFields, rS.prefixIndexedFields, - rS.dm, utils.CacheResourceFilterIndexes, ev.Tenant, rS.filterS.cfg.ResourceSCfg().IndexedSelects) - if err != nil { - return nil, err - } - for resName := range rIDs { - rPrf, err := rS.dm.GetResourceProfile(ev.Tenant, resName, true, true, utils.NonTransactional) - if err != nil { - if err == utils.ErrNotFound { - continue - } - return nil, err - } - if rPrf.ActivationInterval != nil && ev.Time != nil && - !rPrf.ActivationInterval.IsActiveAtTime(*ev.Time) { // not active - continue - } - if pass, err := rS.filterS.Pass(ev.Tenant, rPrf.FilterIDs, - config.NewNavigableMap(ev.Event)); err != nil { - return nil, err - } else if !pass { - continue - } - r, err := rS.dm.GetResource(rPrf.Tenant, rPrf.ID, true, true, "") - if err != nil { - return nil, err - } - if rPrf.Stored && r.dirty == nil { - r.dirty = utils.BoolPointer(false) - } - if usageTTL != nil { - if *usageTTL != 0 { - r.ttl = usageTTL - } - } else if rPrf.UsageTTL >= 0 { - r.ttl = utils.DurationPointer(rPrf.UsageTTL) - } - r.rPrf = rPrf - matchingResources[rPrf.ID] = r - } - // All good, convert from Map to Slice so we can sort - rs = make(Resources, len(matchingResources)) - i := 0 - for _, r := range matchingResources { - rs[i] = r - i++ - } - rs.Sort() - for i, r := range rs { - if r.rPrf.Blocker { // blocker will stop processing - rs = rs[:i+1] - break - } - } - return -} - // processThresholds will pass the event for resource to ThresholdS func (rS *ResourceService) processThresholds(r *Resource, argDispatcher *utils.ArgDispatcher) (err error) { if rS.thdS == nil { @@ -535,30 +432,129 @@ func (rS *ResourceService) processThresholds(r *Resource, argDispatcher *utils.A if err = rS.thdS.Call(utils.ThresholdSv1ProcessEvent, thEv, &tIDs); err != nil && err.Error() != utils.ErrNotFound.Error() { utils.Logger.Warning( - fmt.Sprintf(" error: %s processing event %+v with ThresholdS.", err.Error(), thEv)) + fmt.Sprintf("<%s> error: %s processing event %+v with %s.", + utils.ResourceS, err.Error(), thEv, utils.ThresholdS)) } return } +// matchingResourcesForEvent returns ordered list of matching resources which are active by the time of the call +func (rS *ResourceService) matchingResourcesForEvent(ev *utils.CGREvent, + evUUID string, usageTTL *time.Duration) (rs Resources, err error) { + matchingResources := make(map[string]*Resource) + var isCached bool + var rIDs utils.StringMap + if x, ok := Cache.Get(utils.CacheEventResources, evUUID); ok { // The ResourceIDs were cached as utils.StringMap{"resID":bool} + isCached = true + if x == nil { + err = utils.ErrNotFound + } + rIDs = x.(utils.StringMap) + } else { // select the resourceIDs out of dataDB + rIDs, err = MatchingItemIDsForEvent(ev.Event, rS.stringIndexedFields, rS.prefixIndexedFields, + rS.dm, utils.CacheResourceFilterIndexes, ev.Tenant, rS.filterS.cfg.ResourceSCfg().IndexedSelects) + } + if err != nil { + if err == utils.ErrNotFound { + Cache.Set(utils.CacheEventResources, evUUID, nil, nil, true, "") // cache negative match + } + return + } + lockIDs := utils.PrefixSliceItems(rs.IDs(), utils.ResourcesPrefix) + guardian.Guardian.Guard(func() (gIface interface{}, gErr error) { + for resName := range rIDs { + var rPrf *ResourceProfile + if rPrf, err = rS.dm.GetResourceProfile(ev.Tenant, resName, + true, true, utils.NonTransactional); err != nil { + if err == utils.ErrNotFound { + continue + } + return + } + if rPrf.ActivationInterval != nil && ev.Time != nil && + !rPrf.ActivationInterval.IsActiveAtTime(*ev.Time) { // not active + continue + } + if pass, err := rS.filterS.Pass(ev.Tenant, rPrf.FilterIDs, + config.NewNavigableMap(ev.Event)); err != nil { + return nil, err + } else if !pass { + continue + } + r, err := rS.dm.GetResource(rPrf.Tenant, rPrf.ID, true, true, "") + if err != nil { + return nil, err + } + if rPrf.Stored && r.dirty == nil { + r.dirty = utils.BoolPointer(false) + } + if usageTTL != nil { + if *usageTTL != 0 { + r.ttl = usageTTL + } + } else if rPrf.UsageTTL >= 0 { + r.ttl = utils.DurationPointer(rPrf.UsageTTL) + } + r.rPrf = rPrf + matchingResources[rPrf.ID] = r + } + return + }, config.CgrConfig().GeneralCfg().LockingTimeout, lockIDs...) + if err != nil { + if isCached { + Cache.Remove(utils.CacheEventResources, evUUID, + cacheCommit(utils.NonTransactional), utils.NonTransactional) + } + return + } + // All good, convert from Map to Slice so we can sort + rs = make(Resources, len(matchingResources)) + i := 0 + for _, r := range matchingResources { + rs[i] = r + i++ + } + rs.Sort() + for i, r := range rs { + if r.rPrf.Blocker { // blocker will stop processing + rs = rs[:i+1] + break + } + } + Cache.Set(utils.CacheEventResources, evUUID, rs.resIDsMp(), nil, true, "") + return +} + // V1ResourcesForEvent returns active resource configs matching the event func (rS *ResourceService) V1ResourcesForEvent(args utils.ArgRSv1ResourceUsage, reply *Resources) (err error) { - if missing := utils.MissingStructFields(&args.CGREvent, []string{"Tenant", "ID"}); len(missing) != 0 { //Params missing + if missing := utils.MissingStructFields(&args.CGREvent, []string{utils.Tenant, utils.ID, utils.Event}); len(missing) != 0 { //Params missing return utils.NewErrMandatoryIeMissing(missing...) - } else if args.Event == nil { - return utils.NewErrMandatoryIeMissing("Event") + } else if args.UsageID == "" { + return utils.NewErrMandatoryIeMissing(utils.UsageID) } - var mtcRLs Resources - if args.UsageID != "" { // only cached if UsageID is present - mtcRLs = rS.cachedResourcesForEvent(args.TenantID()) - } - if mtcRLs == nil { - if mtcRLs, err = rS.matchingResourcesForEvent(&args.CGREvent, args.UsageTTL); err != nil { - return err + + // RPC caching + if config.CgrConfig().CacheCfg()[utils.CacheRPCResponses].Limit != 0 { + cacheKey := utils.ConcatenatedKey(utils.ResourceSv1GetResourcesForEvent, args.TenantID()) + refID := guardian.Guardian.GuardIDs("", + config.CgrConfig().GeneralCfg().LockingTimeout, cacheKey) // RPC caching needs to be atomic + defer guardian.Guardian.UnguardIDs(refID) + if itm, has := Cache.Get(utils.CacheRPCResponses, cacheKey); has { + cachedResp := itm.(*utils.CachedRPCResponse) + if cachedResp.Error == nil { + *reply = *cachedResp.Result.(*Resources) + } + return cachedResp.Error } - Cache.Set(utils.CacheEventResources, args.TenantID(), mtcRLs.tenantIDs(), nil, true, "") + defer Cache.Set(utils.CacheRPCResponses, cacheKey, + &utils.CachedRPCResponse{Result: reply, Error: err}, + nil, true, utils.NonTransactional) } - if len(mtcRLs) == 0 { - return utils.ErrNotFound + // end of RPC caching + + var mtcRLs Resources + if mtcRLs, err = rS.matchingResourcesForEvent(&args.CGREvent, args.UsageID, args.UsageTTL); err != nil { + return err } *reply = mtcRLs return @@ -566,23 +562,36 @@ func (rS *ResourceService) V1ResourcesForEvent(args utils.ArgRSv1ResourceUsage, // V1AuthorizeResources queries service to find if an Usage is allowed func (rS *ResourceService) V1AuthorizeResources(args utils.ArgRSv1ResourceUsage, reply *string) (err error) { - var alcMessage string - if missing := utils.MissingStructFields(&args.CGREvent, []string{"Tenant"}); len(missing) != 0 { //Params missing + if missing := utils.MissingStructFields(&args.CGREvent, []string{utils.Tenant, utils.ID, utils.Event}); len(missing) != 0 { //Params missing return utils.NewErrMandatoryIeMissing(missing...) + } else if args.UsageID == "" { + return utils.NewErrMandatoryIeMissing(utils.UsageID) } - if missing := utils.MissingStructFields(&args, []string{"UsageID"}); len(missing) != 0 { //Params missing - return utils.NewErrMandatoryIeMissing(missing...) - } - if args.CGREvent.Event == nil { - return utils.NewErrMandatoryIeMissing("Event") - } - mtcRLs := rS.cachedResourcesForEvent(args.TenantID()) - if mtcRLs == nil { - if mtcRLs, err = rS.matchingResourcesForEvent(&args.CGREvent, args.UsageTTL); err != nil { - return err + + // RPC caching + if config.CgrConfig().CacheCfg()[utils.CacheRPCResponses].Limit != 0 { + cacheKey := utils.ConcatenatedKey(utils.ResourceSv1AuthorizeResources, args.TenantID()) + refID := guardian.Guardian.GuardIDs("", + config.CgrConfig().GeneralCfg().LockingTimeout, cacheKey) // RPC caching needs to be atomic + defer guardian.Guardian.UnguardIDs(refID) + if itm, has := Cache.Get(utils.CacheRPCResponses, cacheKey); has { + cachedResp := itm.(*utils.CachedRPCResponse) + if cachedResp.Error == nil { + *reply = *cachedResp.Result.(*string) + } + return cachedResp.Error } - Cache.Set(utils.CacheEventResources, args.TenantID(), mtcRLs.tenantIDs(), nil, true, "") + defer Cache.Set(utils.CacheRPCResponses, cacheKey, + &utils.CachedRPCResponse{Result: reply, Error: err}, + nil, true, utils.NonTransactional) } + // end of RPC caching + + var mtcRLs Resources + if mtcRLs, err = rS.matchingResourcesForEvent(&args.CGREvent, args.UsageID, args.UsageTTL); err != nil { + return err + } + var alcMessage string if alcMessage, err = mtcRLs.allocateResource( &ResourceUsage{ Tenant: args.CGREvent.Tenant, @@ -590,9 +599,8 @@ func (rS *ResourceService) V1AuthorizeResources(args utils.ArgRSv1ResourceUsage, Units: args.Units}, true); err != nil { if err == utils.ErrResourceUnavailable { err = utils.ErrResourceUnauthorized - Cache.Set(utils.CacheEventResources, args.UsageID, nil, nil, true, "") - return } + return } *reply = alcMessage return @@ -600,44 +608,44 @@ func (rS *ResourceService) V1AuthorizeResources(args utils.ArgRSv1ResourceUsage, // V1AllocateResource is called when a resource requires allocation func (rS *ResourceService) V1AllocateResource(args utils.ArgRSv1ResourceUsage, reply *string) (err error) { - if missing := utils.MissingStructFields(&args.CGREvent, []string{"Tenant"}); len(missing) != 0 { //Params missing + if missing := utils.MissingStructFields(&args.CGREvent, []string{utils.Tenant, utils.ID, utils.Event}); len(missing) != 0 { //Params missing return utils.NewErrMandatoryIeMissing(missing...) + } else if args.UsageID == "" { + return utils.NewErrMandatoryIeMissing(utils.UsageID) } - if missing := utils.MissingStructFields(&args, []string{"UsageID"}); len(missing) != 0 { //Params missing - return utils.NewErrMandatoryIeMissing(missing...) - } - if args.CGREvent.Event == nil { - return utils.NewErrMandatoryIeMissing("Event") - } - var wasCached bool - mtcRLs := rS.cachedResourcesForEvent(args.UsageID) - if mtcRLs == nil { - if mtcRLs, err = rS.matchingResourcesForEvent(&args.CGREvent, args.UsageTTL); err != nil { - return + + // RPC caching + if config.CgrConfig().CacheCfg()[utils.CacheRPCResponses].Limit != 0 { + cacheKey := utils.ConcatenatedKey(utils.ResourceSv1AllocateResources, args.TenantID()) + refID := guardian.Guardian.GuardIDs("", + config.CgrConfig().GeneralCfg().LockingTimeout, cacheKey) // RPC caching needs to be atomic + defer guardian.Guardian.UnguardIDs(refID) + if itm, has := Cache.Get(utils.CacheRPCResponses, cacheKey); has { + cachedResp := itm.(*utils.CachedRPCResponse) + if cachedResp.Error == nil { + *reply = *cachedResp.Result.(*string) + } + return cachedResp.Error } - } else { - wasCached = true + defer Cache.Set(utils.CacheRPCResponses, cacheKey, + &utils.CachedRPCResponse{Result: reply, Error: err}, + nil, true, utils.NonTransactional) } - alcMsg, err := mtcRLs.allocateResource( - &ResourceUsage{Tenant: args.CGREvent.Tenant, ID: args.UsageID, Units: args.Units}, false) - if err != nil { + // end of RPC caching + + var mtcRLs Resources + if mtcRLs, err = rS.matchingResourcesForEvent(&args.CGREvent, args.UsageID, + args.UsageTTL); err != nil { + return err + } + + var alcMsg string + if alcMsg, err = mtcRLs.allocateResource( + &ResourceUsage{Tenant: args.CGREvent.Tenant, ID: args.UsageID, + Units: args.Units}, false); err != nil { return } - // index it for matching out of cache - var wasShortCached bool - if wasCached { - if _, has := Cache.Get(utils.CacheEventResources, args.UsageID); has { - // remove from short cache to populate event cache - wasShortCached = true - Cache.Remove(utils.CacheEventResources, args.UsageID, true, "") - } - } - if wasShortCached || !wasCached { - rS.lcERMux.Lock() - rS.lcEventResources[args.UsageID] = mtcRLs.tenantIDs() - rS.lcERMux.Unlock() - } // index it for storing for _, r := range mtcRLs { if rS.storeInterval == 0 || r.dirty == nil { @@ -659,25 +667,39 @@ func (rS *ResourceService) V1AllocateResource(args utils.ArgRSv1ResourceUsage, r // V1ReleaseResource is called when we need to clear an allocation func (rS *ResourceService) V1ReleaseResource(args utils.ArgRSv1ResourceUsage, reply *string) (err error) { - if missing := utils.MissingStructFields(&args.CGREvent, []string{"Tenant"}); len(missing) != 0 { //Params missing + if missing := utils.MissingStructFields(&args.CGREvent, []string{utils.Tenant, utils.ID, utils.Event}); len(missing) != 0 { //Params missing return utils.NewErrMandatoryIeMissing(missing...) + } else if args.UsageID == "" { + return utils.NewErrMandatoryIeMissing(utils.UsageID) } - if missing := utils.MissingStructFields(&args, []string{"UsageID"}); len(missing) != 0 { //Params missing - return utils.NewErrMandatoryIeMissing(missing...) - } - if args.CGREvent.Event == nil { - return utils.NewErrMandatoryIeMissing("Event") - } - mtcRLs := rS.cachedResourcesForEvent(args.UsageID) - if mtcRLs == nil { - if mtcRLs, err = rS.matchingResourcesForEvent(&args.CGREvent, args.UsageTTL); err != nil { - return + + // RPC caching + if config.CgrConfig().CacheCfg()[utils.CacheRPCResponses].Limit != 0 { + cacheKey := utils.ConcatenatedKey(utils.ResourceSv1ReleaseResources, args.TenantID()) + refID := guardian.Guardian.GuardIDs("", + config.CgrConfig().GeneralCfg().LockingTimeout, cacheKey) // RPC caching needs to be atomic + defer guardian.Guardian.UnguardIDs(refID) + if itm, has := Cache.Get(utils.CacheRPCResponses, cacheKey); has { + cachedResp := itm.(*utils.CachedRPCResponse) + if cachedResp.Error == nil { + *reply = *cachedResp.Result.(*string) + } + return cachedResp.Error } + defer Cache.Set(utils.CacheRPCResponses, cacheKey, + &utils.CachedRPCResponse{Result: reply, Error: err}, + nil, true, utils.NonTransactional) + } + // end of RPC caching + + var mtcRLs Resources + if mtcRLs, err = rS.matchingResourcesForEvent(&args.CGREvent, args.UsageID, + args.UsageTTL); err != nil { + return err } mtcRLs.clearUsage(args.UsageID) - rS.lcERMux.Lock() - delete(rS.lcEventResources, args.UsageID) - rS.lcERMux.Unlock() + + // Handle storing if rS.storeInterval != -1 { rS.srMux.Lock() } @@ -695,6 +717,7 @@ func (rS *ResourceService) V1ReleaseResource(args utils.ArgRSv1ResourceUsage, re if rS.storeInterval != -1 { rS.srMux.Unlock() } + *reply = utils.OK - return nil + return } diff --git a/engine/resources_test.go b/engine/resources_test.go index 9abeb03b4..82d4e9bbe 100644 --- a/engine/resources_test.go +++ b/engine/resources_test.go @@ -394,10 +394,10 @@ func TestResourceV1AuthorizeResourceMissingStruct(t *testing.T) { }, Units: 20, } - if err := resService.V1AuthorizeResources(argsMissingTenant, reply); err.Error() != "MANDATORY_IE_MISSING: [Tenant]" { + if err := resService.V1AuthorizeResources(argsMissingTenant, reply); err.Error() != "MANDATORY_IE_MISSING: [Tenant Event]" { t.Error(err.Error()) } - if err := resService.V1AuthorizeResources(argsMissingUsageID, reply); err.Error() != "MANDATORY_IE_MISSING: [UsageID]" { + if err := resService.V1AuthorizeResources(argsMissingUsageID, reply); err.Error() != "MANDATORY_IE_MISSING: [Event]" { t.Error(err.Error()) } } @@ -471,6 +471,7 @@ func TestResourceAddFilters(t *testing.T) { dmRES.SetFilter(fltrRes3) } +/* func TestResourceCachedResourcesForEvent(t *testing.T) { args := &utils.ArgRSv1ResourceUsage{ CGREvent: *resEvs[0], @@ -523,6 +524,7 @@ func TestResourceCachedResourcesForEvent(t *testing.T) { utils.ToJSON(resources[0]), utils.ToJSON(rcv[0])) } } +*/ func TestResourceAddResourceProfile(t *testing.T) { for _, resProfile := range resprf { @@ -543,7 +545,8 @@ func TestResourceAddResourceProfile(t *testing.T) { } func TestResourceMatchingResourcesForEvent(t *testing.T) { - mres, err := resService.matchingResourcesForEvent(resEvs[0], &timeDurationExample) + mres, err := resService.matchingResourcesForEvent(resEvs[0], + "TestResourceMatchingResourcesForEvent1", &timeDurationExample) if err != nil { t.Errorf("Error: %+v", err) } @@ -555,7 +558,8 @@ func TestResourceMatchingResourcesForEvent(t *testing.T) { t.Errorf("Expecting: %+v, received: %+v", resourceTest[0].rPrf, mres[0].rPrf) } - mres, err = resService.matchingResourcesForEvent(resEvs[1], &timeDurationExample) + mres, err = resService.matchingResourcesForEvent(resEvs[1], + "TestResourceMatchingResourcesForEvent2", &timeDurationExample) if err != nil { t.Errorf("Error: %+v", err) } @@ -567,7 +571,8 @@ func TestResourceMatchingResourcesForEvent(t *testing.T) { t.Errorf("Expecting: %+v, received: %+v", resourceTest[1].rPrf, mres[0].rPrf) } - mres, err = resService.matchingResourcesForEvent(resEvs[2], &timeDurationExample) + mres, err = resService.matchingResourcesForEvent(resEvs[2], + "TestResourceMatchingResourcesForEvent3", &timeDurationExample) if err != nil { t.Errorf("Error: %+v", err) } @@ -592,7 +597,8 @@ func TestResourceUsageTTLCase1(t *testing.T) { if err := dmRES.SetResource(resourceTest[0]); err != nil { t.Error(err) } - mres, err := resService.matchingResourcesForEvent(resEvs[0], &timeDurationExample) + mres, err := resService.matchingResourcesForEvent(resEvs[0], + "TestResourceUsageTTLCase1", &timeDurationExample) if err != nil { t.Errorf("Error: %+v", err) } @@ -619,7 +625,8 @@ func TestResourceUsageTTLCase2(t *testing.T) { if err := dmRES.SetResource(resourceTest[0]); err != nil { t.Error(err) } - mres, err := resService.matchingResourcesForEvent(resEvs[0], nil) + mres, err := resService.matchingResourcesForEvent(resEvs[0], + "TestResourceUsageTTLCase2", nil) if err != nil { t.Errorf("Error: %+v", err) } @@ -646,7 +653,8 @@ func TestResourceUsageTTLCase3(t *testing.T) { if err := dmRES.SetResource(resourceTest[0]); err != nil { t.Error(err) } - mres, err := resService.matchingResourcesForEvent(resEvs[0], utils.DurationPointer(time.Duration(0))) + mres, err := resService.matchingResourcesForEvent(resEvs[0], + "TestResourceUsageTTLCase3", utils.DurationPointer(time.Duration(0))) if err != nil { t.Errorf("Error: %+v", err) } @@ -673,7 +681,8 @@ func TestResourceUsageTTLCase4(t *testing.T) { if err := dmRES.SetResource(resourceTest[0]); err != nil { t.Error(err) } - mres, err := resService.matchingResourcesForEvent(resEvs[0], &timeDurationExample) + mres, err := resService.matchingResourcesForEvent(resEvs[0], + "TestResourceUsageTTLCase4", &timeDurationExample) if err != nil { t.Errorf("Error: %+v", err) } @@ -691,7 +700,8 @@ func TestResourceUsageTTLCase4(t *testing.T) { func TestResourceMatchWithIndexFalse(t *testing.T) { Cache.Clear(nil) resService.filterS.cfg.ResourceSCfg().IndexedSelects = false - mres, err := resService.matchingResourcesForEvent(resEvs[0], &timeDurationExample) + mres, err := resService.matchingResourcesForEvent(resEvs[0], + "TestResourceMatchWithIndexFalse1", &timeDurationExample) if err != nil { t.Errorf("Error: %+v", err) } @@ -703,7 +713,8 @@ func TestResourceMatchWithIndexFalse(t *testing.T) { t.Errorf("Expecting: %+v, received: %+v", resourceTest[0].rPrf, mres[0].rPrf) } - mres, err = resService.matchingResourcesForEvent(resEvs[1], &timeDurationExample) + mres, err = resService.matchingResourcesForEvent(resEvs[1], + "TestResourceMatchWithIndexFalse2", &timeDurationExample) if err != nil { t.Errorf("Error: %+v", err) } @@ -715,7 +726,8 @@ func TestResourceMatchWithIndexFalse(t *testing.T) { t.Errorf("Expecting: %+v, received: %+v", resourceTest[1].rPrf, mres[0].rPrf) } - mres, err = resService.matchingResourcesForEvent(resEvs[2], &timeDurationExample) + mres, err = resService.matchingResourcesForEvent(resEvs[2], + "TestResourceMatchWithIndexFalse3", &timeDurationExample) if err != nil { t.Errorf("Error: %+v", err) } diff --git a/utils/apitpdata.go b/utils/apitpdata.go index 5d39a002e..b0c84a838 100755 --- a/utils/apitpdata.go +++ b/utils/apitpdata.go @@ -1021,7 +1021,8 @@ type ArgRSv1ResourceUsage struct { *ArgDispatcher } -func (args *ArgRSv1ResourceUsage) TenantID() string { +// TenantUsageID is used when caching events to resources +func (args *ArgRSv1ResourceUsage) TenantUsageID() string { return ConcatenatedKey(args.CGREvent.Tenant, args.UsageID) } diff --git a/utils/consts.go b/utils/consts.go index 935bf6c29..3c0a716c0 100755 --- a/utils/consts.go +++ b/utils/consts.go @@ -62,7 +62,6 @@ var ( CacheSharedGroups: SHARED_GROUP_PREFIX, CacheResourceProfiles: ResourceProfilesPrefix, CacheResources: ResourcesPrefix, - CacheEventResources: EventResourcesPrefix, CacheTimings: TimingsPrefix, CacheStatQueueProfiles: StatQueueProfilePrefix, CacheStatQueues: StatQueuePrefix, @@ -434,7 +433,6 @@ const ( CostSource = "CostSource" ExtraInfo = "ExtraInfo" Meta = "*" - EventResourcesPrefix = "ers_" MetaSysLog = "*syslog" MetaStdLog = "*stdout" MetaNever = "*never" @@ -568,7 +566,7 @@ const ( TLSNoCaps = "tls" MetaRouteID = "*route_id" MetaApiKey = "*api_key" - M + UsageID = "UsageID" ) // Migrator Action