mirror of
https://github.com/cgrates/cgrates.git
synced 2026-02-22 23:58:44 +05:00
ResourceLimiterService.V1InitiateResourceUsage method, autoexpire resource usage based on individual TTL
This commit is contained in:
@@ -30,15 +30,59 @@ import (
|
||||
"github.com/cgrates/rpcclient"
|
||||
)
|
||||
|
||||
type ResourceUsage struct {
|
||||
ID string
|
||||
UsageTime time.Time
|
||||
UsageUnits float64
|
||||
}
|
||||
|
||||
// ResourceLimit represents a limit imposed for accessing a resource (eg: new calls)
|
||||
type ResourceLimit struct {
|
||||
ID string // Identifier of this limit
|
||||
Filters []*RequestFilter // Filters for the request
|
||||
ActivationTime time.Time // Time when this limit becomes active
|
||||
Weight float64 // Weight to sort the ResourceLimits
|
||||
Limit float64 // Limit value
|
||||
ActionTriggers ActionTriggers // Thresholds to check after changing Limit
|
||||
Usage map[string]time.Time //Keep a record of usage, bounded with timestamps so we can expire too long records
|
||||
ID string // Identifier of this limit
|
||||
Filters []*RequestFilter // Filters for the request
|
||||
ActivationTime time.Time // Time when this limit becomes active
|
||||
Weight float64 // Weight to sort the ResourceLimits
|
||||
Limit float64 // Limit value
|
||||
ActionTriggers ActionTriggers // Thresholds to check after changing Limit
|
||||
UsageTTL time.Duration // Expire usage after this duration
|
||||
Usage map[string]*ResourceUsage //Keep a record of usage, bounded with timestamps so we can expire too long records
|
||||
usageCounter float64 // internal counter representing real usage
|
||||
}
|
||||
|
||||
func (rl *ResourceLimit) removeExpiredUnits() {
|
||||
for ruID, rv := range rl.Usage {
|
||||
if time.Now().Sub(rv.UsageTime) <= rl.UsageTTL {
|
||||
continue // not expired
|
||||
}
|
||||
delete(rl.Usage, ruID)
|
||||
rl.usageCounter -= rv.UsageUnits
|
||||
}
|
||||
}
|
||||
|
||||
func (rl *ResourceLimit) UsedUnits() float64 {
|
||||
if rl.UsageTTL != 0 {
|
||||
rl.removeExpiredUnits()
|
||||
}
|
||||
return rl.usageCounter
|
||||
}
|
||||
|
||||
func (rl *ResourceLimit) RecordUsage(ru *ResourceUsage) error {
|
||||
if _, hasID := rl.Usage[ru.ID]; hasID {
|
||||
return fmt.Errorf("Duplicate resource usage with id: %s", ru.ID)
|
||||
}
|
||||
rl.Usage[ru.ID] = ru
|
||||
rl.usageCounter += ru.UsageUnits
|
||||
return nil
|
||||
}
|
||||
|
||||
func (rl *ResourceLimit) RemoveUsage(ruID string) error {
|
||||
ru, hasIt := rl.Usage[ruID]
|
||||
if !hasIt {
|
||||
return fmt.Errorf("Cannot find usage record with id: %s", ruID)
|
||||
}
|
||||
delete(rl.Usage, ru.ID)
|
||||
rl.usageCounter -= ru.UsageUnits
|
||||
return nil
|
||||
}
|
||||
|
||||
// Pas the config as a whole so we can ask access concurrently
|
||||
@@ -157,6 +201,58 @@ func (rls *ResourceLimiterService) Shutdown() error {
|
||||
|
||||
// RPC Methods available internally
|
||||
|
||||
// Called when a session or another event needs to
|
||||
func (rls *ResourceLimiterService) V1InitiateResourceUsage(attrs utils.AttrRLsResourceUsage, reply *string) error {
|
||||
rls.Lock() // Unknown number of RLs updated
|
||||
defer rls.Unlock()
|
||||
matchingResources := make(map[string]*ResourceLimit)
|
||||
for fldName, fieldValIf := range attrs.Event {
|
||||
_, hasIt := rls.stringIndexes[fldName]
|
||||
if !hasIt {
|
||||
continue
|
||||
}
|
||||
strVal, canCast := utils.CastFieldIfToString(fieldValIf)
|
||||
if !canCast {
|
||||
return fmt.Errorf("Cannot cast field: %s into string", fldName)
|
||||
}
|
||||
if _, hasIt := rls.stringIndexes[fldName][strVal]; !hasIt {
|
||||
continue
|
||||
}
|
||||
for resName := range rls.stringIndexes[fldName][strVal] {
|
||||
if _, hasIt := matchingResources[resName]; hasIt { // Already checked this RL
|
||||
continue
|
||||
}
|
||||
x, err := CacheGet(utils.ResourceLimitsPrefix + resName)
|
||||
if err != nil {
|
||||
return err
|
||||
}
|
||||
rl := x.(*ResourceLimit)
|
||||
for _, fltr := range rl.Filters {
|
||||
if pass, err := fltr.Pass(attrs, "", rls.cdrStatS); err != nil {
|
||||
return utils.NewErrServerError(err)
|
||||
} else if !pass {
|
||||
continue
|
||||
}
|
||||
if rl.Limit < rl.UsedUnits()+attrs.RequestedUnits {
|
||||
continue // Not offering any usage units
|
||||
}
|
||||
if err := rl.RecordUsage(&ResourceUsage{ID: attrs.ResourceUsageID, UsageTime: time.Now(), UsageUnits: attrs.RequestedUnits}); err != nil {
|
||||
return err // Should not happen
|
||||
}
|
||||
matchingResources[rl.ID] = rl // Cannot save it here since we could have errors after and resource will remain unused
|
||||
}
|
||||
}
|
||||
}
|
||||
if len(matchingResources) == 0 {
|
||||
return utils.ErrResourceUnavailable
|
||||
}
|
||||
for _, rl := range matchingResources {
|
||||
CacheSet(utils.ResourceLimitsPrefix+rl.ID, rl)
|
||||
}
|
||||
*reply = utils.OK
|
||||
return nil
|
||||
}
|
||||
|
||||
// Cache/Re-cache
|
||||
func (rls *ResourceLimiterService) V1CacheResourceLimits(attrs *utils.AttrRLsCache, reply *string) error {
|
||||
if err := rls.cacheResourceLimits(attrs.LoadID, attrs.ResourceLimitIDs); err != nil {
|
||||
|
||||
Reference in New Issue
Block a user