From 1d8577d0cf7038c694711c7cdd62fb90f289e2d0 Mon Sep 17 00:00:00 2001 From: DanB Date: Fri, 16 Jul 2021 16:09:21 +0200 Subject: [PATCH] Resources locking at API level --- engine/resources.go | 523 +++++++++++++++++++++++-------------- engine/routes.go | 2 +- engine/z_resources_test.go | 6 +- 3 files changed, 329 insertions(+), 202 deletions(-) diff --git a/engine/resources.go b/engine/resources.go index 6131d8fe7..fd5af0c2f 100644 --- a/engine/resources.go +++ b/engine/resources.go @@ -48,6 +48,8 @@ type ResourceProfile struct { Stored bool Weight float64 // Weight to sort the resources ThresholdIDs []string // Thresholds to check after changing Limit + + lkID string // holds the reference towards guardian lock key } // ResourceProfileWithAPIOpts is used in replicatorV1 for dispatcher @@ -61,6 +63,36 @@ func (rp *ResourceProfile) TenantID() string { return utils.ConcatenatedKey(rp.Tenant, rp.ID) } +// resourceProfileLockKey returns the ID used to lock a resourceProfile with guardian +func resourceProfileLockKey(tnt, id string) string { + return utils.ConcatenatedKey(utils.CacheResourceProfiles, tnt, id) +} + +// lock will lock the resourceProfile using guardian and store the lock within r.lkID +// if lkID is passed as argument, the lock is considered as executed +func (rp *ResourceProfile) lock(lkID string) { + if lkID == utils.EmptyString { + lkID = guardian.Guardian.GuardIDs("", + config.CgrConfig().GeneralCfg().LockingTimeout, + resourceProfileLockKey(rp.Tenant, rp.ID)) + } + rp.lkID = lkID +} + +// unlock will unlock the resourceProfile and clear rp.lkID +func (rp *ResourceProfile) unlock() { + if rp.lkID == utils.EmptyString { + return + } + guardian.Guardian.UnguardIDs(rp.lkID) + rp.lkID = utils.EmptyString +} + +// isLocked returns the locks status of this resourceProfile +func (rp *ResourceProfile) isLocked() bool { + return rp.lkID != utils.EmptyString +} + // ResourceUsage represents an usage counted type ResourceUsage struct { Tenant string @@ -89,14 +121,46 @@ func (ru *ResourceUsage) Clone() (cln *ResourceUsage) { // Resource represents a resource in the system // not thread safe, needs locking at process level type Resource struct { - Tenant string - ID string - Usages map[string]*ResourceUsage - TTLIdx []string // holds ordered list of ResourceIDs based on their TTL, empty if feature is disabled - ttl *time.Duration // time to leave for this resource, picked up on each Resource initialization out of config - tUsage *float64 // sum of all usages - dirty *bool // the usages were modified, needs save, *bool so we only save if enabled in config - rPrf *ResourceProfile // for ordering purposes + Tenant string + ID string + Usages map[string]*ResourceUsage + TTLIdx []string // holds ordered list of ResourceIDs based on their TTL, empty if feature is disableda + lkID string // ID of the lock used when matching the resource + prflLkID string // ID of the lock used for the profile + ttl *time.Duration // time to leave for this resource, picked up on each Resource initialization out of config + tUsage *float64 // sum of all usages + dirty *bool // the usages were modified, needs save, *bool so we only save if enabled in config + rPrf *ResourceProfile // for ordering purposes +} + +// resourceLockKey returns the ID used to lock a resource with guardian +func resourceLockKey(tnt, id string) string { + return utils.ConcatenatedKey(utils.CacheResources, tnt, id) +} + +// lock will lock the resource using guardian and store the lock within r.lkID +// if lkID is passed as argument, the lock is considered as executed +func (r *Resource) lock(lkID string) { + if lkID == utils.EmptyString { + lkID = guardian.Guardian.GuardIDs("", + config.CgrConfig().GeneralCfg().LockingTimeout, + resourceLockKey(r.Tenant, r.ID)) + } + r.lkID = lkID +} + +// unlock will unlock the resource and clear r.lkID +func (r *Resource) unlock() { + if r.lkID == utils.EmptyString { + return + } + guardian.Guardian.UnguardIDs(r.lkID) + r.lkID = utils.EmptyString +} + +// isLocked returns the locks status of this resource +func (r *Resource) isLocked() bool { + return r.lkID != utils.EmptyString } // ResourceWithAPIOpts is used in replicatorV1 for dispatcher @@ -141,8 +205,9 @@ func (r *Resource) removeExpiredUnits() { r.tUsage = nil } -// totalUsage returns the sum of all usage units -func (r *Resource) totalUsage() (tU float64) { +// TotalUsage returns the sum of all usage units +// Exported to be used in FilterS +func (r *Resource) TotalUsage() (tU float64) { if r.tUsage == nil { var tu float64 for _, ru := range r.Usages { @@ -156,16 +221,10 @@ func (r *Resource) totalUsage() (tU float64) { return } -// TotalUsage returns the sum of all usage units -// Exported method to be used by filterS -func (r *Resource) TotalUsage() (tU float64) { - return r.totalUsage() -} - // Available returns the available number of units // Exported method to be used by filterS func (r *ResourceWithConfig) Available() float64 { - return r.Config.Limit - r.totalUsage() + return r.Config.Limit - r.TotalUsage() } // recordUsage records a new usage @@ -219,6 +278,41 @@ func (rs Resources) Sort() { sort.Slice(rs, func(i, j int) bool { return rs[i].rPrf.Weight > rs[j].rPrf.Weight }) } +// unlock will unlock resources part of this slice +func (rs Resources) unlock() { + for _, r := range rs { + r.unlock() + if r.rPrf != nil { + r.rPrf.unlock() + } + } +} + +// resIDsMp returns a map of resource IDs which is used for caching +func (rs Resources) resIDsMp() (mp utils.StringSet) { + mp = make(utils.StringSet) + for _, r := range rs { + mp.Add(r.ID) + } + return mp +} + +func (rs Resources) tenatIDs() []string { + ids := make([]string, len(rs)) + for i, r := range rs { + ids[i] = r.TenantID() + } + return ids +} + +func (rs Resources) IDs() []string { + ids := make([]string, len(rs)) + for i, r := range rs { + ids[i] = r.ID + } + return ids +} + // recordUsage will record the usage in all the resource limits, failing back on errors func (rs Resources) recordUsage(ru *ResourceUsage) (err error) { var nonReservedIdx int // index of first resource not reserved @@ -251,31 +345,6 @@ func (rs Resources) clearUsage(ruTntID string) (err error) { return } -// resIDsMp returns a map of resource IDs which is used for caching -func (rs Resources) resIDsMp() (mp utils.StringSet) { - mp = make(utils.StringSet) - for _, r := range rs { - mp.Add(r.ID) - } - return mp -} - -func (rs Resources) tenatIDs() []string { - ids := make([]string, len(rs)) - for i, r := range rs { - ids[i] = r.TenantID() - } - return ids -} - -func (rs Resources) IDs() []string { - ids := make([]string, len(rs)) - for i, r := range rs { - ids[i] = r.ID - } - return ids -} - // allocateResource attempts allocating resources for a *ResourceUsage // simulates on dryRun // returns utils.ErrResourceUnavailable if allocation is not possible @@ -283,38 +352,36 @@ func (rs Resources) allocateResource(ru *ResourceUsage, dryRun bool) (alcMessage if len(rs) == 0 { return "", utils.ErrResourceUnavailable } - lockIDs := utils.PrefixSliceItems(rs.tenatIDs(), utils.ResourcesPrefix) - guardian.Guardian.Guard(func() (gRes interface{}, gErr error) { - // Simulate resource usage - for _, r := range rs { - r.removeExpiredUnits() - if _, hasID := r.Usages[ru.ID]; hasID && !dryRun { // update - r.clearUsage(ru.ID) // clearUsage returns error only when ru.ID does not exist in the Usages map - } - if r.rPrf == nil { - err = fmt.Errorf("empty configuration for resourceID: %s", r.TenantID()) - return - } - if r.rPrf.Limit >= r.totalUsage()+ru.Units || r.rPrf.Limit == -1 { - if alcMessage == "" { - if r.rPrf.AllocationMessage != "" { - alcMessage = r.rPrf.AllocationMessage - } else { - alcMessage = r.rPrf.ID - } + // Simulate resource usage + for _, r := range rs { + r.removeExpiredUnits() + if _, hasID := r.Usages[ru.ID]; hasID && !dryRun { // update + r.clearUsage(ru.ID) // clearUsage returns error only when ru.ID does not exist in the Usages map + } + if r.rPrf == nil { + err = fmt.Errorf("empty configuration for resourceID: %s", r.TenantID()) + return + } + if r.rPrf.Limit >= r.TotalUsage()+ru.Units || r.rPrf.Limit == -1 { + if alcMessage == "" { + if r.rPrf.AllocationMessage != "" { + alcMessage = r.rPrf.AllocationMessage + } else { + alcMessage = r.rPrf.ID } } } - if alcMessage == "" { - err = utils.ErrResourceUnavailable - return - } - if dryRun { - return - } - err = rs.recordUsage(ru) + } + if alcMessage == "" { + err = utils.ErrResourceUnavailable return - }, config.CgrConfig().GeneralCfg().LockingTimeout, lockIDs...) + } + if dryRun { + return + } + if err = rs.recordUsage(ru); err != nil { + return utils.EmptyString, err + } return } @@ -352,8 +419,59 @@ func (rS *ResourceService) Shutdown() { utils.Logger.Info(" service shutdown complete") } +// backup will regularly store resources changed to dataDB +func (rS *ResourceService) runBackup() { + storeInterval := rS.cgrcfg.ResourceSCfg().StoreInterval + if storeInterval <= 0 { + rS.loopStoped <- struct{}{} + return + } + for { + rS.storeResources() + select { + case <-rS.stopBackup: + rS.loopStoped <- struct{}{} + return + case <-time.After(storeInterval): + } + } +} + +// storeResources represents one task of complete backup +func (rS *ResourceService) storeResources() { + var failedRIDs []string + for { // don't stop until we store all dirty resources + rS.srMux.Lock() + rID := rS.storedResources.GetOne() + if rID != "" { + rS.storedResources.Remove(rID) + } + rS.srMux.Unlock() + if rID == "" { + break // no more keys, backup completed + } + rIf, ok := Cache.Get(utils.CacheResources, rID) + if !ok || rIf == nil { + utils.Logger.Warning(fmt.Sprintf("<%s> failed retrieving from cache resource with ID: %s", utils.ResourceS, rID)) + } + r := rIf.(*Resource) + r.lock(utils.EmptyString) + if err := rS.storeResource(r); err != nil { + failedRIDs = append(failedRIDs, rID) // record failure so we can schedule it for next backup + } + r.unlock() + // randomize the CPU load and give up thread control + runtime.Gosched() + } + if len(failedRIDs) != 0 { // there were errors on save, schedule the keys for next backup + rS.srMux.Lock() + rS.storedResources.AddSlice(failedRIDs) + rS.srMux.Unlock() + } +} + // StoreResource stores the resource in DB and corrects dirty flag -func (rS *ResourceService) StoreResource(r *Resource) (err error) { +func (rS *ResourceService) storeResource(r *Resource) (err error) { if r.dirty == nil || !*r.dirty { return } @@ -374,87 +492,51 @@ func (rS *ResourceService) StoreResource(r *Resource) (err error) { return } -// storeResources represents one task of complete backup -func (rS *ResourceService) storeResources() { - var failedRIDs []string - for { // don't stop until we store all dirty resources - rS.srMux.Lock() - rID := rS.storedResources.GetOne() - if rID != "" { - rS.storedResources.Remove(rID) - } - rS.srMux.Unlock() - if rID == "" { - break // no more keys, backup completed - } - if rIf, ok := Cache.Get(utils.CacheResources, rID); !ok || rIf == nil { - utils.Logger.Warning(fmt.Sprintf("<%s> failed retrieving from cache resource with ID: %s", utils.ResourceS, rID)) - } else if err := rS.StoreResource(rIf.(*Resource)); err != nil { - failedRIDs = append(failedRIDs, rID) // record failure so we can schedule it for next backup - } - // randomize the CPU load and give up thread control - runtime.Gosched() - } - if len(failedRIDs) != 0 { // there were errors on save, schedule the keys for next backup - rS.srMux.Lock() - rS.storedResources.AddSlice(failedRIDs) - rS.srMux.Unlock() - } -} - -// backup will regularly store resources changed to dataDB -func (rS *ResourceService) runBackup() { - storeInterval := rS.cgrcfg.ResourceSCfg().StoreInterval - if storeInterval <= 0 { - rS.loopStoped <- struct{}{} - return - } - for { - rS.storeResources() - select { - case <-rS.stopBackup: - rS.loopStoped <- struct{}{} - return - case <-time.After(storeInterval): - } - } -} - // processThresholds will pass the event for resource to ThresholdS -func (rS *ResourceService) processThresholds(r *Resource, opts map[string]interface{}) (err error) { +func (rS *ResourceService) processThresholds(rs Resources, opts map[string]interface{}) (err error) { if len(rS.cgrcfg.ResourceSCfg().ThresholdSConns) == 0 { return } - var thIDs []string - if len(r.rPrf.ThresholdIDs) != 0 { - if len(r.rPrf.ThresholdIDs) == 1 && r.rPrf.ThresholdIDs[0] == utils.MetaNone { - return - } - thIDs = r.rPrf.ThresholdIDs - } if opts == nil { opts = make(map[string]interface{}) } opts[utils.MetaEventType] = utils.ResourceUpdate - thEv := &ThresholdsArgsProcessEvent{ThresholdIDs: thIDs, - CGREvent: &utils.CGREvent{ - Tenant: r.Tenant, - ID: utils.GenUUID(), - Event: map[string]interface{}{ - utils.EventType: utils.ResourceUpdate, - utils.ResourceID: r.ID, - utils.Usage: r.totalUsage(), + + var withErrs bool + for _, r := range rs { + var thIDs []string + if len(r.rPrf.ThresholdIDs) != 0 { + if len(r.rPrf.ThresholdIDs) == 1 && + r.rPrf.ThresholdIDs[0] == utils.MetaNone { + continue + } + thIDs = r.rPrf.ThresholdIDs + } + + thEv := &ThresholdsArgsProcessEvent{ThresholdIDs: thIDs, + CGREvent: &utils.CGREvent{ + Tenant: r.Tenant, + ID: utils.GenUUID(), + Event: map[string]interface{}{ + utils.EventType: utils.ResourceUpdate, + utils.ResourceID: r.ID, + utils.Usage: r.TotalUsage(), + }, + APIOpts: opts, }, - APIOpts: opts, - }, + } + var tIDs []string + if err = rS.connMgr.Call(rS.cgrcfg.ResourceSCfg().ThresholdSConns, nil, + utils.ThresholdSv1ProcessEvent, thEv, &tIDs); err != nil && + err.Error() != utils.ErrNotFound.Error() { + utils.Logger.Warning( + fmt.Sprintf("<%s> error: %s processing event %+v with %s.", + utils.ResourceS, err.Error(), thEv, utils.ThresholdS)) + withErrs = true + } } - var tIDs []string - if err = rS.connMgr.Call(rS.cgrcfg.ResourceSCfg().ThresholdSConns, nil, - utils.ThresholdSv1ProcessEvent, thEv, &tIDs); err != nil && - err.Error() != utils.ErrNotFound.Error() { - utils.Logger.Warning( - fmt.Sprintf("<%s> error: %s processing event %+v with %s.", - utils.ResourceS, err.Error(), thEv, utils.ThresholdS)) + if withErrs { + err = utils.ErrPartiallyExecuted } return } @@ -463,18 +545,26 @@ func (rS *ResourceService) processThresholds(r *Resource, opts map[string]interf func (rS *ResourceService) matchingResourcesForEvent(tnt string, ev *utils.CGREvent, evUUID string, usageTTL *time.Duration) (rs Resources, err error) { matchingResources := make(map[string]*Resource) - var isCached bool var rIDs utils.StringSet evNm := utils.MapStorage{ utils.MetaReq: ev.Event, utils.MetaOpts: ev.APIOpts, } if x, ok := Cache.Get(utils.CacheEventResources, evUUID); ok { // The ResourceIDs were cached as utils.StringSet{"resID":bool} - isCached = true if x == nil { return nil, utils.ErrNotFound } rIDs = x.(utils.StringSet) + defer func() { // make sure we uncache if we find errors + if err != nil { + if errCh := Cache.Remove(utils.CacheEventResources, evUUID, + cacheCommit(utils.NonTransactional), utils.NonTransactional); errCh != nil { + err = errCh + } + } + return + }() + } else { // select the resourceIDs out of dataDB rIDs, err = MatchingItemIDsForEvent(evNm, rS.cgrcfg.ResourceSCfg().StringIndexedFields, @@ -493,55 +583,57 @@ func (rS *ResourceService) matchingResourcesForEvent(tnt string, ev *utils.CGREv return } } - lockIDs := utils.PrefixSliceItems(rs.IDs(), utils.ResourcesPrefix) - guardian.Guardian.Guard(func() (gIface interface{}, gErr error) { - for resName := range rIDs { - var rPrf *ResourceProfile - if rPrf, err = rS.dm.GetResourceProfile(tnt, resName, - true, true, utils.NonTransactional); err != nil { - if err == utils.ErrNotFound { - continue - } - return - } - if rPrf.ActivationInterval != nil && ev.Time != nil && - !rPrf.ActivationInterval.IsActiveAtTime(*ev.Time) { // not active + for resName := range rIDs { + lkPrflID := guardian.Guardian.GuardIDs("", + config.CgrConfig().GeneralCfg().LockingTimeout, + resourceProfileLockKey(tnt, resName)) + var rPrf *ResourceProfile + if rPrf, err = rS.dm.GetResourceProfile(tnt, resName, + true, true, utils.NonTransactional); err != nil { + guardian.Guardian.UnguardIDs(lkPrflID) + if err == utils.ErrNotFound { continue } - if pass, err := rS.filterS.Pass(tnt, rPrf.FilterIDs, - evNm); err != nil { - return nil, err - } else if !pass { - continue - } - r, err := rS.dm.GetResource(rPrf.Tenant, rPrf.ID, true, true, "") - if err != nil { - return nil, err - } - if rPrf.Stored && r.dirty == nil { - r.dirty = utils.BoolPointer(false) - } - if usageTTL != nil { - if *usageTTL != 0 { - r.ttl = usageTTL - } - } else if rPrf.UsageTTL >= 0 { - r.ttl = utils.DurationPointer(rPrf.UsageTTL) - } - r.rPrf = rPrf - matchingResources[rPrf.ID] = r + return } - return - }, config.CgrConfig().GeneralCfg().LockingTimeout, lockIDs...) - if err != nil { - if isCached { - if errCh := Cache.Remove(utils.CacheEventResources, evUUID, - cacheCommit(utils.NonTransactional), utils.NonTransactional); errCh != nil { - return nil, errCh - } + rPrf.lock(lkPrflID) + if rPrf.ActivationInterval != nil && ev.Time != nil && + !rPrf.ActivationInterval.IsActiveAtTime(*ev.Time) { // not active + rPrf.unlock() + continue } - return + if pass, err := rS.filterS.Pass(tnt, rPrf.FilterIDs, + evNm); err != nil { + rPrf.unlock() + return nil, err + } else if !pass { + rPrf.unlock() + continue + } + lkID := guardian.Guardian.GuardIDs(utils.EmptyString, + config.CgrConfig().GeneralCfg().LockingTimeout, + resourceLockKey(rPrf.Tenant, rPrf.ID)) + r, err := rS.dm.GetResource(rPrf.Tenant, rPrf.ID, true, true, "") + if err != nil { + guardian.Guardian.UnguardIDs(lkID) + guardian.Guardian.UnguardIDs(lkPrflID) + return nil, err + } + r.lock(lkID) // pass the lock into resource so we have it as reference + if rPrf.Stored && r.dirty == nil { + r.dirty = utils.BoolPointer(false) + } + if usageTTL != nil { + if *usageTTL != 0 { + r.ttl = usageTTL + } + } else if rPrf.UsageTTL >= 0 { + r.ttl = utils.DurationPointer(rPrf.UsageTTL) + } + r.rPrf = rPrf + matchingResources[rPrf.ID] = r } + if len(matchingResources) == 0 { return nil, utils.ErrNotFound } @@ -552,12 +644,15 @@ func (rS *ResourceService) matchingResourcesForEvent(tnt string, ev *utils.CGREv } rs.Sort() for i, r := range rs { - if r.rPrf.Blocker { // blocker will stop processing + if r.rPrf.Blocker && i != len(rs)-1 { // blocker will stop processing and we are not at last index + Resources(rs[i+1:]).unlock() rs = rs[:i+1] break } } - err = Cache.Set(utils.CacheEventResources, evUUID, rs.resIDsMp(), nil, true, "") + if err = Cache.Set(utils.CacheEventResources, evUUID, rs.resIDsMp(), nil, true, ""); err != nil { + rs.unlock() + } return } @@ -600,6 +695,7 @@ func (rS *ResourceService) V1ResourcesForEvent(args utils.ArgRSv1ResourceUsage, return err } *reply = mtcRLs + mtcRLs.unlock() return } @@ -641,6 +737,8 @@ func (rS *ResourceService) V1AuthorizeResources(args utils.ArgRSv1ResourceUsage, if mtcRLs, err = rS.matchingResourcesForEvent(tnt, args.CGREvent, args.UsageID, args.UsageTTL); err != nil { return err } + defer mtcRLs.unlock() + var alcMessage string if alcMessage, err = mtcRLs.allocateResource( &ResourceUsage{ @@ -695,6 +793,7 @@ func (rS *ResourceService) V1AllocateResources(args utils.ArgRSv1ResourceUsage, args.UsageTTL); err != nil { return err } + defer mtcRLs.unlock() var alcMsg string if alcMsg, err = mtcRLs.allocateResource( @@ -708,7 +807,7 @@ func (rS *ResourceService) V1AllocateResources(args utils.ArgRSv1ResourceUsage, if rS.cgrcfg.ResourceSCfg().StoreInterval != 0 && r.dirty != nil { if rS.cgrcfg.ResourceSCfg().StoreInterval == -1 { *r.dirty = true - if err = rS.StoreResource(r); err != nil { + if err = rS.storeResource(r); err != nil { return } } else { @@ -718,9 +817,10 @@ func (rS *ResourceService) V1AllocateResources(args utils.ArgRSv1ResourceUsage, rS.srMux.Unlock() } } - if err = rS.processThresholds(r, args.APIOpts); err != nil { - return - } + + } + if err = rS.processThresholds(mtcRLs, args.APIOpts); err != nil { + return } *reply = alcMsg return @@ -765,6 +865,8 @@ func (rS *ResourceService) V1ReleaseResources(args utils.ArgRSv1ResourceUsage, r args.UsageTTL); err != nil { return err } + defer mtcRLs.unlock() + if err = mtcRLs.clearUsage(args.UsageID); err != nil { return } @@ -777,7 +879,7 @@ func (rS *ResourceService) V1ReleaseResources(args utils.ArgRSv1ResourceUsage, r for _, r := range mtcRLs { if r.dirty != nil { if rS.cgrcfg.ResourceSCfg().StoreInterval == -1 { - if err = rS.StoreResource(r); err != nil { + if err = rS.storeResource(r); err != nil { return } } else { @@ -785,9 +887,11 @@ func (rS *ResourceService) V1ReleaseResources(args utils.ArgRSv1ResourceUsage, r rS.storedResources.Add(r.TenantID()) } } - if err = rS.processThresholds(r, args.APIOpts); err != nil { - return - } + + } + + if err = rS.processThresholds(mtcRLs, args.APIOpts); err != nil { + return } *reply = utils.OK @@ -803,6 +907,13 @@ func (rS *ResourceService) V1GetResource(arg *utils.TenantIDWithAPIOpts, reply * if tnt == utils.EmptyString { tnt = rS.cgrcfg.GeneralCfg().DefaultTenant } + + // make sure resource is locked at process level + lkID := guardian.Guardian.GuardIDs(utils.EmptyString, + config.CgrConfig().GeneralCfg().LockingTimeout, + resourceLockKey(tnt, arg.ID)) + defer guardian.Guardian.UnguardIDs(lkID) + res, err := rS.dm.GetResource(tnt, arg.ID, true, true, utils.NonTransactional) if err != nil { return err @@ -824,11 +935,25 @@ func (rS *ResourceService) V1GetResourceWithConfig(arg *utils.TenantIDWithAPIOpt if tnt == utils.EmptyString { tnt = rS.cgrcfg.GeneralCfg().DefaultTenant } + + // make sure resource is locked at process level + lkID := guardian.Guardian.GuardIDs(utils.EmptyString, + config.CgrConfig().GeneralCfg().LockingTimeout, + resourceLockKey(tnt, arg.ID)) + defer guardian.Guardian.UnguardIDs(lkID) + var res *Resource res, err = rS.dm.GetResource(tnt, arg.ID, true, true, utils.NonTransactional) if err != nil { return } + + // make sure resourceProfile is locked at process level + lkPrflID := guardian.Guardian.GuardIDs(utils.EmptyString, + config.CgrConfig().GeneralCfg().LockingTimeout, + resourceProfileLockKey(tnt, arg.ID)) + defer guardian.Guardian.UnguardIDs(lkPrflID) + if res.rPrf == nil { var cfg *ResourceProfile cfg, err = rS.dm.GetResourceProfile(tnt, arg.ID, true, true, utils.NonTransactional) @@ -837,10 +962,12 @@ func (rS *ResourceService) V1GetResourceWithConfig(arg *utils.TenantIDWithAPIOpt } res.rPrf = cfg } + *reply = ResourceWithConfig{ Resource: res, Config: res.rPrf, } + return } diff --git a/engine/routes.go b/engine/routes.go index f8ade084b..594a6db4e 100644 --- a/engine/routes.go +++ b/engine/routes.go @@ -366,7 +366,7 @@ func (rpS *RouteService) resourceUsage(resIDs []string, tenant string) (tUsage f fmt.Sprintf("<%s> error: %s getting resource for ID : %s", utils.RouteS, err.Error(), resID)) continue } - tUsage += res.totalUsage() + tUsage += res.TotalUsage() } } return diff --git a/engine/z_resources_test.go b/engine/z_resources_test.go index c66a3b05f..6846375cd 100644 --- a/engine/z_resources_test.go +++ b/engine/z_resources_test.go @@ -3385,7 +3385,7 @@ func TestResourcesStoreResourceNotDirty(t *testing.T) { dirty: utils.BoolPointer(false), } - err := rS.StoreResource(r) + err := rS.storeResource(r) if err != nil { t.Error(err) @@ -3401,7 +3401,7 @@ func TestResourcesStoreResourceOK(t *testing.T) { dirty: utils.BoolPointer(true), } - err := rS.StoreResource(r) + err := rS.storeResource(r) if err != nil { t.Error(err) @@ -3446,7 +3446,7 @@ func TestResourcesStoreResourceErrCache(t *testing.T) { explog := `CGRateS <> [WARNING] failed caching Resource with ID: cgrates.org:RES1, error: NOT_IMPLEMENTED ` - if err := rS.StoreResource(r); err == nil || + if err := rS.storeResource(r); err == nil || err.Error() != utils.ErrNotImplemented.Error() { t.Errorf("expected: <%+v>, \nreceived: <%+v>", utils.ErrNotImplemented, err) }