mirror of
https://github.com/cgrates/cgrates.git
synced 2026-02-11 18:16:24 +05:00
Resources locking at API level
This commit is contained in:
@@ -48,6 +48,8 @@ type ResourceProfile struct {
|
||||
Stored bool
|
||||
Weight float64 // Weight to sort the resources
|
||||
ThresholdIDs []string // Thresholds to check after changing Limit
|
||||
|
||||
lkID string // holds the reference towards guardian lock key
|
||||
}
|
||||
|
||||
// ResourceProfileWithAPIOpts is used in replicatorV1 for dispatcher
|
||||
@@ -61,6 +63,36 @@ func (rp *ResourceProfile) TenantID() string {
|
||||
return utils.ConcatenatedKey(rp.Tenant, rp.ID)
|
||||
}
|
||||
|
||||
// resourceProfileLockKey returns the ID used to lock a resourceProfile with guardian
|
||||
func resourceProfileLockKey(tnt, id string) string {
|
||||
return utils.ConcatenatedKey(utils.CacheResourceProfiles, tnt, id)
|
||||
}
|
||||
|
||||
// lock will lock the resourceProfile using guardian and store the lock within r.lkID
|
||||
// if lkID is passed as argument, the lock is considered as executed
|
||||
func (rp *ResourceProfile) lock(lkID string) {
|
||||
if lkID == utils.EmptyString {
|
||||
lkID = guardian.Guardian.GuardIDs("",
|
||||
config.CgrConfig().GeneralCfg().LockingTimeout,
|
||||
resourceProfileLockKey(rp.Tenant, rp.ID))
|
||||
}
|
||||
rp.lkID = lkID
|
||||
}
|
||||
|
||||
// unlock will unlock the resourceProfile and clear rp.lkID
|
||||
func (rp *ResourceProfile) unlock() {
|
||||
if rp.lkID == utils.EmptyString {
|
||||
return
|
||||
}
|
||||
guardian.Guardian.UnguardIDs(rp.lkID)
|
||||
rp.lkID = utils.EmptyString
|
||||
}
|
||||
|
||||
// isLocked returns the locks status of this resourceProfile
|
||||
func (rp *ResourceProfile) isLocked() bool {
|
||||
return rp.lkID != utils.EmptyString
|
||||
}
|
||||
|
||||
// ResourceUsage represents an usage counted
|
||||
type ResourceUsage struct {
|
||||
Tenant string
|
||||
@@ -89,14 +121,46 @@ func (ru *ResourceUsage) Clone() (cln *ResourceUsage) {
|
||||
// Resource represents a resource in the system
|
||||
// not thread safe, needs locking at process level
|
||||
type Resource struct {
|
||||
Tenant string
|
||||
ID string
|
||||
Usages map[string]*ResourceUsage
|
||||
TTLIdx []string // holds ordered list of ResourceIDs based on their TTL, empty if feature is disabled
|
||||
ttl *time.Duration // time to leave for this resource, picked up on each Resource initialization out of config
|
||||
tUsage *float64 // sum of all usages
|
||||
dirty *bool // the usages were modified, needs save, *bool so we only save if enabled in config
|
||||
rPrf *ResourceProfile // for ordering purposes
|
||||
Tenant string
|
||||
ID string
|
||||
Usages map[string]*ResourceUsage
|
||||
TTLIdx []string // holds ordered list of ResourceIDs based on their TTL, empty if feature is disableda
|
||||
lkID string // ID of the lock used when matching the resource
|
||||
prflLkID string // ID of the lock used for the profile
|
||||
ttl *time.Duration // time to leave for this resource, picked up on each Resource initialization out of config
|
||||
tUsage *float64 // sum of all usages
|
||||
dirty *bool // the usages were modified, needs save, *bool so we only save if enabled in config
|
||||
rPrf *ResourceProfile // for ordering purposes
|
||||
}
|
||||
|
||||
// resourceLockKey returns the ID used to lock a resource with guardian
|
||||
func resourceLockKey(tnt, id string) string {
|
||||
return utils.ConcatenatedKey(utils.CacheResources, tnt, id)
|
||||
}
|
||||
|
||||
// lock will lock the resource using guardian and store the lock within r.lkID
|
||||
// if lkID is passed as argument, the lock is considered as executed
|
||||
func (r *Resource) lock(lkID string) {
|
||||
if lkID == utils.EmptyString {
|
||||
lkID = guardian.Guardian.GuardIDs("",
|
||||
config.CgrConfig().GeneralCfg().LockingTimeout,
|
||||
resourceLockKey(r.Tenant, r.ID))
|
||||
}
|
||||
r.lkID = lkID
|
||||
}
|
||||
|
||||
// unlock will unlock the resource and clear r.lkID
|
||||
func (r *Resource) unlock() {
|
||||
if r.lkID == utils.EmptyString {
|
||||
return
|
||||
}
|
||||
guardian.Guardian.UnguardIDs(r.lkID)
|
||||
r.lkID = utils.EmptyString
|
||||
}
|
||||
|
||||
// isLocked returns the locks status of this resource
|
||||
func (r *Resource) isLocked() bool {
|
||||
return r.lkID != utils.EmptyString
|
||||
}
|
||||
|
||||
// ResourceWithAPIOpts is used in replicatorV1 for dispatcher
|
||||
@@ -141,8 +205,9 @@ func (r *Resource) removeExpiredUnits() {
|
||||
r.tUsage = nil
|
||||
}
|
||||
|
||||
// totalUsage returns the sum of all usage units
|
||||
func (r *Resource) totalUsage() (tU float64) {
|
||||
// TotalUsage returns the sum of all usage units
|
||||
// Exported to be used in FilterS
|
||||
func (r *Resource) TotalUsage() (tU float64) {
|
||||
if r.tUsage == nil {
|
||||
var tu float64
|
||||
for _, ru := range r.Usages {
|
||||
@@ -156,16 +221,10 @@ func (r *Resource) totalUsage() (tU float64) {
|
||||
return
|
||||
}
|
||||
|
||||
// TotalUsage returns the sum of all usage units
|
||||
// Exported method to be used by filterS
|
||||
func (r *Resource) TotalUsage() (tU float64) {
|
||||
return r.totalUsage()
|
||||
}
|
||||
|
||||
// Available returns the available number of units
|
||||
// Exported method to be used by filterS
|
||||
func (r *ResourceWithConfig) Available() float64 {
|
||||
return r.Config.Limit - r.totalUsage()
|
||||
return r.Config.Limit - r.TotalUsage()
|
||||
}
|
||||
|
||||
// recordUsage records a new usage
|
||||
@@ -219,6 +278,41 @@ func (rs Resources) Sort() {
|
||||
sort.Slice(rs, func(i, j int) bool { return rs[i].rPrf.Weight > rs[j].rPrf.Weight })
|
||||
}
|
||||
|
||||
// unlock will unlock resources part of this slice
|
||||
func (rs Resources) unlock() {
|
||||
for _, r := range rs {
|
||||
r.unlock()
|
||||
if r.rPrf != nil {
|
||||
r.rPrf.unlock()
|
||||
}
|
||||
}
|
||||
}
|
||||
|
||||
// resIDsMp returns a map of resource IDs which is used for caching
|
||||
func (rs Resources) resIDsMp() (mp utils.StringSet) {
|
||||
mp = make(utils.StringSet)
|
||||
for _, r := range rs {
|
||||
mp.Add(r.ID)
|
||||
}
|
||||
return mp
|
||||
}
|
||||
|
||||
func (rs Resources) tenatIDs() []string {
|
||||
ids := make([]string, len(rs))
|
||||
for i, r := range rs {
|
||||
ids[i] = r.TenantID()
|
||||
}
|
||||
return ids
|
||||
}
|
||||
|
||||
func (rs Resources) IDs() []string {
|
||||
ids := make([]string, len(rs))
|
||||
for i, r := range rs {
|
||||
ids[i] = r.ID
|
||||
}
|
||||
return ids
|
||||
}
|
||||
|
||||
// recordUsage will record the usage in all the resource limits, failing back on errors
|
||||
func (rs Resources) recordUsage(ru *ResourceUsage) (err error) {
|
||||
var nonReservedIdx int // index of first resource not reserved
|
||||
@@ -251,31 +345,6 @@ func (rs Resources) clearUsage(ruTntID string) (err error) {
|
||||
return
|
||||
}
|
||||
|
||||
// resIDsMp returns a map of resource IDs which is used for caching
|
||||
func (rs Resources) resIDsMp() (mp utils.StringSet) {
|
||||
mp = make(utils.StringSet)
|
||||
for _, r := range rs {
|
||||
mp.Add(r.ID)
|
||||
}
|
||||
return mp
|
||||
}
|
||||
|
||||
func (rs Resources) tenatIDs() []string {
|
||||
ids := make([]string, len(rs))
|
||||
for i, r := range rs {
|
||||
ids[i] = r.TenantID()
|
||||
}
|
||||
return ids
|
||||
}
|
||||
|
||||
func (rs Resources) IDs() []string {
|
||||
ids := make([]string, len(rs))
|
||||
for i, r := range rs {
|
||||
ids[i] = r.ID
|
||||
}
|
||||
return ids
|
||||
}
|
||||
|
||||
// allocateResource attempts allocating resources for a *ResourceUsage
|
||||
// simulates on dryRun
|
||||
// returns utils.ErrResourceUnavailable if allocation is not possible
|
||||
@@ -283,38 +352,36 @@ func (rs Resources) allocateResource(ru *ResourceUsage, dryRun bool) (alcMessage
|
||||
if len(rs) == 0 {
|
||||
return "", utils.ErrResourceUnavailable
|
||||
}
|
||||
lockIDs := utils.PrefixSliceItems(rs.tenatIDs(), utils.ResourcesPrefix)
|
||||
guardian.Guardian.Guard(func() (gRes interface{}, gErr error) {
|
||||
// Simulate resource usage
|
||||
for _, r := range rs {
|
||||
r.removeExpiredUnits()
|
||||
if _, hasID := r.Usages[ru.ID]; hasID && !dryRun { // update
|
||||
r.clearUsage(ru.ID) // clearUsage returns error only when ru.ID does not exist in the Usages map
|
||||
}
|
||||
if r.rPrf == nil {
|
||||
err = fmt.Errorf("empty configuration for resourceID: %s", r.TenantID())
|
||||
return
|
||||
}
|
||||
if r.rPrf.Limit >= r.totalUsage()+ru.Units || r.rPrf.Limit == -1 {
|
||||
if alcMessage == "" {
|
||||
if r.rPrf.AllocationMessage != "" {
|
||||
alcMessage = r.rPrf.AllocationMessage
|
||||
} else {
|
||||
alcMessage = r.rPrf.ID
|
||||
}
|
||||
// Simulate resource usage
|
||||
for _, r := range rs {
|
||||
r.removeExpiredUnits()
|
||||
if _, hasID := r.Usages[ru.ID]; hasID && !dryRun { // update
|
||||
r.clearUsage(ru.ID) // clearUsage returns error only when ru.ID does not exist in the Usages map
|
||||
}
|
||||
if r.rPrf == nil {
|
||||
err = fmt.Errorf("empty configuration for resourceID: %s", r.TenantID())
|
||||
return
|
||||
}
|
||||
if r.rPrf.Limit >= r.TotalUsage()+ru.Units || r.rPrf.Limit == -1 {
|
||||
if alcMessage == "" {
|
||||
if r.rPrf.AllocationMessage != "" {
|
||||
alcMessage = r.rPrf.AllocationMessage
|
||||
} else {
|
||||
alcMessage = r.rPrf.ID
|
||||
}
|
||||
}
|
||||
}
|
||||
if alcMessage == "" {
|
||||
err = utils.ErrResourceUnavailable
|
||||
return
|
||||
}
|
||||
if dryRun {
|
||||
return
|
||||
}
|
||||
err = rs.recordUsage(ru)
|
||||
}
|
||||
if alcMessage == "" {
|
||||
err = utils.ErrResourceUnavailable
|
||||
return
|
||||
}, config.CgrConfig().GeneralCfg().LockingTimeout, lockIDs...)
|
||||
}
|
||||
if dryRun {
|
||||
return
|
||||
}
|
||||
if err = rs.recordUsage(ru); err != nil {
|
||||
return utils.EmptyString, err
|
||||
}
|
||||
return
|
||||
}
|
||||
|
||||
@@ -352,8 +419,59 @@ func (rS *ResourceService) Shutdown() {
|
||||
utils.Logger.Info("<ResourceS> service shutdown complete")
|
||||
}
|
||||
|
||||
// backup will regularly store resources changed to dataDB
|
||||
func (rS *ResourceService) runBackup() {
|
||||
storeInterval := rS.cgrcfg.ResourceSCfg().StoreInterval
|
||||
if storeInterval <= 0 {
|
||||
rS.loopStoped <- struct{}{}
|
||||
return
|
||||
}
|
||||
for {
|
||||
rS.storeResources()
|
||||
select {
|
||||
case <-rS.stopBackup:
|
||||
rS.loopStoped <- struct{}{}
|
||||
return
|
||||
case <-time.After(storeInterval):
|
||||
}
|
||||
}
|
||||
}
|
||||
|
||||
// storeResources represents one task of complete backup
|
||||
func (rS *ResourceService) storeResources() {
|
||||
var failedRIDs []string
|
||||
for { // don't stop until we store all dirty resources
|
||||
rS.srMux.Lock()
|
||||
rID := rS.storedResources.GetOne()
|
||||
if rID != "" {
|
||||
rS.storedResources.Remove(rID)
|
||||
}
|
||||
rS.srMux.Unlock()
|
||||
if rID == "" {
|
||||
break // no more keys, backup completed
|
||||
}
|
||||
rIf, ok := Cache.Get(utils.CacheResources, rID)
|
||||
if !ok || rIf == nil {
|
||||
utils.Logger.Warning(fmt.Sprintf("<%s> failed retrieving from cache resource with ID: %s", utils.ResourceS, rID))
|
||||
}
|
||||
r := rIf.(*Resource)
|
||||
r.lock(utils.EmptyString)
|
||||
if err := rS.storeResource(r); err != nil {
|
||||
failedRIDs = append(failedRIDs, rID) // record failure so we can schedule it for next backup
|
||||
}
|
||||
r.unlock()
|
||||
// randomize the CPU load and give up thread control
|
||||
runtime.Gosched()
|
||||
}
|
||||
if len(failedRIDs) != 0 { // there were errors on save, schedule the keys for next backup
|
||||
rS.srMux.Lock()
|
||||
rS.storedResources.AddSlice(failedRIDs)
|
||||
rS.srMux.Unlock()
|
||||
}
|
||||
}
|
||||
|
||||
// StoreResource stores the resource in DB and corrects dirty flag
|
||||
func (rS *ResourceService) StoreResource(r *Resource) (err error) {
|
||||
func (rS *ResourceService) storeResource(r *Resource) (err error) {
|
||||
if r.dirty == nil || !*r.dirty {
|
||||
return
|
||||
}
|
||||
@@ -374,87 +492,51 @@ func (rS *ResourceService) StoreResource(r *Resource) (err error) {
|
||||
return
|
||||
}
|
||||
|
||||
// storeResources represents one task of complete backup
|
||||
func (rS *ResourceService) storeResources() {
|
||||
var failedRIDs []string
|
||||
for { // don't stop until we store all dirty resources
|
||||
rS.srMux.Lock()
|
||||
rID := rS.storedResources.GetOne()
|
||||
if rID != "" {
|
||||
rS.storedResources.Remove(rID)
|
||||
}
|
||||
rS.srMux.Unlock()
|
||||
if rID == "" {
|
||||
break // no more keys, backup completed
|
||||
}
|
||||
if rIf, ok := Cache.Get(utils.CacheResources, rID); !ok || rIf == nil {
|
||||
utils.Logger.Warning(fmt.Sprintf("<%s> failed retrieving from cache resource with ID: %s", utils.ResourceS, rID))
|
||||
} else if err := rS.StoreResource(rIf.(*Resource)); err != nil {
|
||||
failedRIDs = append(failedRIDs, rID) // record failure so we can schedule it for next backup
|
||||
}
|
||||
// randomize the CPU load and give up thread control
|
||||
runtime.Gosched()
|
||||
}
|
||||
if len(failedRIDs) != 0 { // there were errors on save, schedule the keys for next backup
|
||||
rS.srMux.Lock()
|
||||
rS.storedResources.AddSlice(failedRIDs)
|
||||
rS.srMux.Unlock()
|
||||
}
|
||||
}
|
||||
|
||||
// backup will regularly store resources changed to dataDB
|
||||
func (rS *ResourceService) runBackup() {
|
||||
storeInterval := rS.cgrcfg.ResourceSCfg().StoreInterval
|
||||
if storeInterval <= 0 {
|
||||
rS.loopStoped <- struct{}{}
|
||||
return
|
||||
}
|
||||
for {
|
||||
rS.storeResources()
|
||||
select {
|
||||
case <-rS.stopBackup:
|
||||
rS.loopStoped <- struct{}{}
|
||||
return
|
||||
case <-time.After(storeInterval):
|
||||
}
|
||||
}
|
||||
}
|
||||
|
||||
// processThresholds will pass the event for resource to ThresholdS
|
||||
func (rS *ResourceService) processThresholds(r *Resource, opts map[string]interface{}) (err error) {
|
||||
func (rS *ResourceService) processThresholds(rs Resources, opts map[string]interface{}) (err error) {
|
||||
if len(rS.cgrcfg.ResourceSCfg().ThresholdSConns) == 0 {
|
||||
return
|
||||
}
|
||||
var thIDs []string
|
||||
if len(r.rPrf.ThresholdIDs) != 0 {
|
||||
if len(r.rPrf.ThresholdIDs) == 1 && r.rPrf.ThresholdIDs[0] == utils.MetaNone {
|
||||
return
|
||||
}
|
||||
thIDs = r.rPrf.ThresholdIDs
|
||||
}
|
||||
if opts == nil {
|
||||
opts = make(map[string]interface{})
|
||||
}
|
||||
opts[utils.MetaEventType] = utils.ResourceUpdate
|
||||
thEv := &ThresholdsArgsProcessEvent{ThresholdIDs: thIDs,
|
||||
CGREvent: &utils.CGREvent{
|
||||
Tenant: r.Tenant,
|
||||
ID: utils.GenUUID(),
|
||||
Event: map[string]interface{}{
|
||||
utils.EventType: utils.ResourceUpdate,
|
||||
utils.ResourceID: r.ID,
|
||||
utils.Usage: r.totalUsage(),
|
||||
|
||||
var withErrs bool
|
||||
for _, r := range rs {
|
||||
var thIDs []string
|
||||
if len(r.rPrf.ThresholdIDs) != 0 {
|
||||
if len(r.rPrf.ThresholdIDs) == 1 &&
|
||||
r.rPrf.ThresholdIDs[0] == utils.MetaNone {
|
||||
continue
|
||||
}
|
||||
thIDs = r.rPrf.ThresholdIDs
|
||||
}
|
||||
|
||||
thEv := &ThresholdsArgsProcessEvent{ThresholdIDs: thIDs,
|
||||
CGREvent: &utils.CGREvent{
|
||||
Tenant: r.Tenant,
|
||||
ID: utils.GenUUID(),
|
||||
Event: map[string]interface{}{
|
||||
utils.EventType: utils.ResourceUpdate,
|
||||
utils.ResourceID: r.ID,
|
||||
utils.Usage: r.TotalUsage(),
|
||||
},
|
||||
APIOpts: opts,
|
||||
},
|
||||
APIOpts: opts,
|
||||
},
|
||||
}
|
||||
var tIDs []string
|
||||
if err = rS.connMgr.Call(rS.cgrcfg.ResourceSCfg().ThresholdSConns, nil,
|
||||
utils.ThresholdSv1ProcessEvent, thEv, &tIDs); err != nil &&
|
||||
err.Error() != utils.ErrNotFound.Error() {
|
||||
utils.Logger.Warning(
|
||||
fmt.Sprintf("<%s> error: %s processing event %+v with %s.",
|
||||
utils.ResourceS, err.Error(), thEv, utils.ThresholdS))
|
||||
withErrs = true
|
||||
}
|
||||
}
|
||||
var tIDs []string
|
||||
if err = rS.connMgr.Call(rS.cgrcfg.ResourceSCfg().ThresholdSConns, nil,
|
||||
utils.ThresholdSv1ProcessEvent, thEv, &tIDs); err != nil &&
|
||||
err.Error() != utils.ErrNotFound.Error() {
|
||||
utils.Logger.Warning(
|
||||
fmt.Sprintf("<%s> error: %s processing event %+v with %s.",
|
||||
utils.ResourceS, err.Error(), thEv, utils.ThresholdS))
|
||||
if withErrs {
|
||||
err = utils.ErrPartiallyExecuted
|
||||
}
|
||||
return
|
||||
}
|
||||
@@ -463,18 +545,26 @@ func (rS *ResourceService) processThresholds(r *Resource, opts map[string]interf
|
||||
func (rS *ResourceService) matchingResourcesForEvent(tnt string, ev *utils.CGREvent,
|
||||
evUUID string, usageTTL *time.Duration) (rs Resources, err error) {
|
||||
matchingResources := make(map[string]*Resource)
|
||||
var isCached bool
|
||||
var rIDs utils.StringSet
|
||||
evNm := utils.MapStorage{
|
||||
utils.MetaReq: ev.Event,
|
||||
utils.MetaOpts: ev.APIOpts,
|
||||
}
|
||||
if x, ok := Cache.Get(utils.CacheEventResources, evUUID); ok { // The ResourceIDs were cached as utils.StringSet{"resID":bool}
|
||||
isCached = true
|
||||
if x == nil {
|
||||
return nil, utils.ErrNotFound
|
||||
}
|
||||
rIDs = x.(utils.StringSet)
|
||||
defer func() { // make sure we uncache if we find errors
|
||||
if err != nil {
|
||||
if errCh := Cache.Remove(utils.CacheEventResources, evUUID,
|
||||
cacheCommit(utils.NonTransactional), utils.NonTransactional); errCh != nil {
|
||||
err = errCh
|
||||
}
|
||||
}
|
||||
return
|
||||
}()
|
||||
|
||||
} else { // select the resourceIDs out of dataDB
|
||||
rIDs, err = MatchingItemIDsForEvent(evNm,
|
||||
rS.cgrcfg.ResourceSCfg().StringIndexedFields,
|
||||
@@ -493,55 +583,57 @@ func (rS *ResourceService) matchingResourcesForEvent(tnt string, ev *utils.CGREv
|
||||
return
|
||||
}
|
||||
}
|
||||
lockIDs := utils.PrefixSliceItems(rs.IDs(), utils.ResourcesPrefix)
|
||||
guardian.Guardian.Guard(func() (gIface interface{}, gErr error) {
|
||||
for resName := range rIDs {
|
||||
var rPrf *ResourceProfile
|
||||
if rPrf, err = rS.dm.GetResourceProfile(tnt, resName,
|
||||
true, true, utils.NonTransactional); err != nil {
|
||||
if err == utils.ErrNotFound {
|
||||
continue
|
||||
}
|
||||
return
|
||||
}
|
||||
if rPrf.ActivationInterval != nil && ev.Time != nil &&
|
||||
!rPrf.ActivationInterval.IsActiveAtTime(*ev.Time) { // not active
|
||||
for resName := range rIDs {
|
||||
lkPrflID := guardian.Guardian.GuardIDs("",
|
||||
config.CgrConfig().GeneralCfg().LockingTimeout,
|
||||
resourceProfileLockKey(tnt, resName))
|
||||
var rPrf *ResourceProfile
|
||||
if rPrf, err = rS.dm.GetResourceProfile(tnt, resName,
|
||||
true, true, utils.NonTransactional); err != nil {
|
||||
guardian.Guardian.UnguardIDs(lkPrflID)
|
||||
if err == utils.ErrNotFound {
|
||||
continue
|
||||
}
|
||||
if pass, err := rS.filterS.Pass(tnt, rPrf.FilterIDs,
|
||||
evNm); err != nil {
|
||||
return nil, err
|
||||
} else if !pass {
|
||||
continue
|
||||
}
|
||||
r, err := rS.dm.GetResource(rPrf.Tenant, rPrf.ID, true, true, "")
|
||||
if err != nil {
|
||||
return nil, err
|
||||
}
|
||||
if rPrf.Stored && r.dirty == nil {
|
||||
r.dirty = utils.BoolPointer(false)
|
||||
}
|
||||
if usageTTL != nil {
|
||||
if *usageTTL != 0 {
|
||||
r.ttl = usageTTL
|
||||
}
|
||||
} else if rPrf.UsageTTL >= 0 {
|
||||
r.ttl = utils.DurationPointer(rPrf.UsageTTL)
|
||||
}
|
||||
r.rPrf = rPrf
|
||||
matchingResources[rPrf.ID] = r
|
||||
return
|
||||
}
|
||||
return
|
||||
}, config.CgrConfig().GeneralCfg().LockingTimeout, lockIDs...)
|
||||
if err != nil {
|
||||
if isCached {
|
||||
if errCh := Cache.Remove(utils.CacheEventResources, evUUID,
|
||||
cacheCommit(utils.NonTransactional), utils.NonTransactional); errCh != nil {
|
||||
return nil, errCh
|
||||
}
|
||||
rPrf.lock(lkPrflID)
|
||||
if rPrf.ActivationInterval != nil && ev.Time != nil &&
|
||||
!rPrf.ActivationInterval.IsActiveAtTime(*ev.Time) { // not active
|
||||
rPrf.unlock()
|
||||
continue
|
||||
}
|
||||
return
|
||||
if pass, err := rS.filterS.Pass(tnt, rPrf.FilterIDs,
|
||||
evNm); err != nil {
|
||||
rPrf.unlock()
|
||||
return nil, err
|
||||
} else if !pass {
|
||||
rPrf.unlock()
|
||||
continue
|
||||
}
|
||||
lkID := guardian.Guardian.GuardIDs(utils.EmptyString,
|
||||
config.CgrConfig().GeneralCfg().LockingTimeout,
|
||||
resourceLockKey(rPrf.Tenant, rPrf.ID))
|
||||
r, err := rS.dm.GetResource(rPrf.Tenant, rPrf.ID, true, true, "")
|
||||
if err != nil {
|
||||
guardian.Guardian.UnguardIDs(lkID)
|
||||
guardian.Guardian.UnguardIDs(lkPrflID)
|
||||
return nil, err
|
||||
}
|
||||
r.lock(lkID) // pass the lock into resource so we have it as reference
|
||||
if rPrf.Stored && r.dirty == nil {
|
||||
r.dirty = utils.BoolPointer(false)
|
||||
}
|
||||
if usageTTL != nil {
|
||||
if *usageTTL != 0 {
|
||||
r.ttl = usageTTL
|
||||
}
|
||||
} else if rPrf.UsageTTL >= 0 {
|
||||
r.ttl = utils.DurationPointer(rPrf.UsageTTL)
|
||||
}
|
||||
r.rPrf = rPrf
|
||||
matchingResources[rPrf.ID] = r
|
||||
}
|
||||
|
||||
if len(matchingResources) == 0 {
|
||||
return nil, utils.ErrNotFound
|
||||
}
|
||||
@@ -552,12 +644,15 @@ func (rS *ResourceService) matchingResourcesForEvent(tnt string, ev *utils.CGREv
|
||||
}
|
||||
rs.Sort()
|
||||
for i, r := range rs {
|
||||
if r.rPrf.Blocker { // blocker will stop processing
|
||||
if r.rPrf.Blocker && i != len(rs)-1 { // blocker will stop processing and we are not at last index
|
||||
Resources(rs[i+1:]).unlock()
|
||||
rs = rs[:i+1]
|
||||
break
|
||||
}
|
||||
}
|
||||
err = Cache.Set(utils.CacheEventResources, evUUID, rs.resIDsMp(), nil, true, "")
|
||||
if err = Cache.Set(utils.CacheEventResources, evUUID, rs.resIDsMp(), nil, true, ""); err != nil {
|
||||
rs.unlock()
|
||||
}
|
||||
return
|
||||
}
|
||||
|
||||
@@ -600,6 +695,7 @@ func (rS *ResourceService) V1ResourcesForEvent(args utils.ArgRSv1ResourceUsage,
|
||||
return err
|
||||
}
|
||||
*reply = mtcRLs
|
||||
mtcRLs.unlock()
|
||||
return
|
||||
}
|
||||
|
||||
@@ -641,6 +737,8 @@ func (rS *ResourceService) V1AuthorizeResources(args utils.ArgRSv1ResourceUsage,
|
||||
if mtcRLs, err = rS.matchingResourcesForEvent(tnt, args.CGREvent, args.UsageID, args.UsageTTL); err != nil {
|
||||
return err
|
||||
}
|
||||
defer mtcRLs.unlock()
|
||||
|
||||
var alcMessage string
|
||||
if alcMessage, err = mtcRLs.allocateResource(
|
||||
&ResourceUsage{
|
||||
@@ -695,6 +793,7 @@ func (rS *ResourceService) V1AllocateResources(args utils.ArgRSv1ResourceUsage,
|
||||
args.UsageTTL); err != nil {
|
||||
return err
|
||||
}
|
||||
defer mtcRLs.unlock()
|
||||
|
||||
var alcMsg string
|
||||
if alcMsg, err = mtcRLs.allocateResource(
|
||||
@@ -708,7 +807,7 @@ func (rS *ResourceService) V1AllocateResources(args utils.ArgRSv1ResourceUsage,
|
||||
if rS.cgrcfg.ResourceSCfg().StoreInterval != 0 && r.dirty != nil {
|
||||
if rS.cgrcfg.ResourceSCfg().StoreInterval == -1 {
|
||||
*r.dirty = true
|
||||
if err = rS.StoreResource(r); err != nil {
|
||||
if err = rS.storeResource(r); err != nil {
|
||||
return
|
||||
}
|
||||
} else {
|
||||
@@ -718,9 +817,10 @@ func (rS *ResourceService) V1AllocateResources(args utils.ArgRSv1ResourceUsage,
|
||||
rS.srMux.Unlock()
|
||||
}
|
||||
}
|
||||
if err = rS.processThresholds(r, args.APIOpts); err != nil {
|
||||
return
|
||||
}
|
||||
|
||||
}
|
||||
if err = rS.processThresholds(mtcRLs, args.APIOpts); err != nil {
|
||||
return
|
||||
}
|
||||
*reply = alcMsg
|
||||
return
|
||||
@@ -765,6 +865,8 @@ func (rS *ResourceService) V1ReleaseResources(args utils.ArgRSv1ResourceUsage, r
|
||||
args.UsageTTL); err != nil {
|
||||
return err
|
||||
}
|
||||
defer mtcRLs.unlock()
|
||||
|
||||
if err = mtcRLs.clearUsage(args.UsageID); err != nil {
|
||||
return
|
||||
}
|
||||
@@ -777,7 +879,7 @@ func (rS *ResourceService) V1ReleaseResources(args utils.ArgRSv1ResourceUsage, r
|
||||
for _, r := range mtcRLs {
|
||||
if r.dirty != nil {
|
||||
if rS.cgrcfg.ResourceSCfg().StoreInterval == -1 {
|
||||
if err = rS.StoreResource(r); err != nil {
|
||||
if err = rS.storeResource(r); err != nil {
|
||||
return
|
||||
}
|
||||
} else {
|
||||
@@ -785,9 +887,11 @@ func (rS *ResourceService) V1ReleaseResources(args utils.ArgRSv1ResourceUsage, r
|
||||
rS.storedResources.Add(r.TenantID())
|
||||
}
|
||||
}
|
||||
if err = rS.processThresholds(r, args.APIOpts); err != nil {
|
||||
return
|
||||
}
|
||||
|
||||
}
|
||||
|
||||
if err = rS.processThresholds(mtcRLs, args.APIOpts); err != nil {
|
||||
return
|
||||
}
|
||||
|
||||
*reply = utils.OK
|
||||
@@ -803,6 +907,13 @@ func (rS *ResourceService) V1GetResource(arg *utils.TenantIDWithAPIOpts, reply *
|
||||
if tnt == utils.EmptyString {
|
||||
tnt = rS.cgrcfg.GeneralCfg().DefaultTenant
|
||||
}
|
||||
|
||||
// make sure resource is locked at process level
|
||||
lkID := guardian.Guardian.GuardIDs(utils.EmptyString,
|
||||
config.CgrConfig().GeneralCfg().LockingTimeout,
|
||||
resourceLockKey(tnt, arg.ID))
|
||||
defer guardian.Guardian.UnguardIDs(lkID)
|
||||
|
||||
res, err := rS.dm.GetResource(tnt, arg.ID, true, true, utils.NonTransactional)
|
||||
if err != nil {
|
||||
return err
|
||||
@@ -824,11 +935,25 @@ func (rS *ResourceService) V1GetResourceWithConfig(arg *utils.TenantIDWithAPIOpt
|
||||
if tnt == utils.EmptyString {
|
||||
tnt = rS.cgrcfg.GeneralCfg().DefaultTenant
|
||||
}
|
||||
|
||||
// make sure resource is locked at process level
|
||||
lkID := guardian.Guardian.GuardIDs(utils.EmptyString,
|
||||
config.CgrConfig().GeneralCfg().LockingTimeout,
|
||||
resourceLockKey(tnt, arg.ID))
|
||||
defer guardian.Guardian.UnguardIDs(lkID)
|
||||
|
||||
var res *Resource
|
||||
res, err = rS.dm.GetResource(tnt, arg.ID, true, true, utils.NonTransactional)
|
||||
if err != nil {
|
||||
return
|
||||
}
|
||||
|
||||
// make sure resourceProfile is locked at process level
|
||||
lkPrflID := guardian.Guardian.GuardIDs(utils.EmptyString,
|
||||
config.CgrConfig().GeneralCfg().LockingTimeout,
|
||||
resourceProfileLockKey(tnt, arg.ID))
|
||||
defer guardian.Guardian.UnguardIDs(lkPrflID)
|
||||
|
||||
if res.rPrf == nil {
|
||||
var cfg *ResourceProfile
|
||||
cfg, err = rS.dm.GetResourceProfile(tnt, arg.ID, true, true, utils.NonTransactional)
|
||||
@@ -837,10 +962,12 @@ func (rS *ResourceService) V1GetResourceWithConfig(arg *utils.TenantIDWithAPIOpt
|
||||
}
|
||||
res.rPrf = cfg
|
||||
}
|
||||
|
||||
*reply = ResourceWithConfig{
|
||||
Resource: res,
|
||||
Config: res.rPrf,
|
||||
}
|
||||
|
||||
return
|
||||
}
|
||||
|
||||
|
||||
@@ -366,7 +366,7 @@ func (rpS *RouteService) resourceUsage(resIDs []string, tenant string) (tUsage f
|
||||
fmt.Sprintf("<%s> error: %s getting resource for ID : %s", utils.RouteS, err.Error(), resID))
|
||||
continue
|
||||
}
|
||||
tUsage += res.totalUsage()
|
||||
tUsage += res.TotalUsage()
|
||||
}
|
||||
}
|
||||
return
|
||||
|
||||
@@ -3385,7 +3385,7 @@ func TestResourcesStoreResourceNotDirty(t *testing.T) {
|
||||
dirty: utils.BoolPointer(false),
|
||||
}
|
||||
|
||||
err := rS.StoreResource(r)
|
||||
err := rS.storeResource(r)
|
||||
|
||||
if err != nil {
|
||||
t.Error(err)
|
||||
@@ -3401,7 +3401,7 @@ func TestResourcesStoreResourceOK(t *testing.T) {
|
||||
dirty: utils.BoolPointer(true),
|
||||
}
|
||||
|
||||
err := rS.StoreResource(r)
|
||||
err := rS.storeResource(r)
|
||||
|
||||
if err != nil {
|
||||
t.Error(err)
|
||||
@@ -3446,7 +3446,7 @@ func TestResourcesStoreResourceErrCache(t *testing.T) {
|
||||
|
||||
explog := `CGRateS <> [WARNING] <ResourceS> failed caching Resource with ID: cgrates.org:RES1, error: NOT_IMPLEMENTED
|
||||
`
|
||||
if err := rS.StoreResource(r); err == nil ||
|
||||
if err := rS.storeResource(r); err == nil ||
|
||||
err.Error() != utils.ErrNotImplemented.Error() {
|
||||
t.Errorf("expected: <%+v>, \nreceived: <%+v>", utils.ErrNotImplemented, err)
|
||||
}
|
||||
|
||||
Reference in New Issue
Block a user