From a8bb6d332af5a79c3dc47aeb7f491d15eae6d549 Mon Sep 17 00:00:00 2001 From: DanB Date: Sat, 6 Aug 2016 18:36:19 +0200 Subject: [PATCH] ResourceLimiterService.V1InitiateResourceUsage method, autoexpire resource usage based on individual TTL --- engine/reslimiter.go | 110 ++++++++++++++++++++++++++++++++++++++++--- utils/apitpdata.go | 6 +++ utils/consts.go | 1 + 3 files changed, 110 insertions(+), 7 deletions(-) diff --git a/engine/reslimiter.go b/engine/reslimiter.go index c9263775f..6181241b0 100644 --- a/engine/reslimiter.go +++ b/engine/reslimiter.go @@ -30,15 +30,59 @@ import ( "github.com/cgrates/rpcclient" ) +type ResourceUsage struct { + ID string + UsageTime time.Time + UsageUnits float64 +} + // ResourceLimit represents a limit imposed for accessing a resource (eg: new calls) type ResourceLimit struct { - ID string // Identifier of this limit - Filters []*RequestFilter // Filters for the request - ActivationTime time.Time // Time when this limit becomes active - Weight float64 // Weight to sort the ResourceLimits - Limit float64 // Limit value - ActionTriggers ActionTriggers // Thresholds to check after changing Limit - Usage map[string]time.Time //Keep a record of usage, bounded with timestamps so we can expire too long records + ID string // Identifier of this limit + Filters []*RequestFilter // Filters for the request + ActivationTime time.Time // Time when this limit becomes active + Weight float64 // Weight to sort the ResourceLimits + Limit float64 // Limit value + ActionTriggers ActionTriggers // Thresholds to check after changing Limit + UsageTTL time.Duration // Expire usage after this duration + Usage map[string]*ResourceUsage //Keep a record of usage, bounded with timestamps so we can expire too long records + usageCounter float64 // internal counter representing real usage +} + +func (rl *ResourceLimit) removeExpiredUnits() { + for ruID, rv := range rl.Usage { + if time.Now().Sub(rv.UsageTime) <= rl.UsageTTL { + continue // not expired + } + delete(rl.Usage, ruID) + rl.usageCounter -= rv.UsageUnits + } +} + +func (rl *ResourceLimit) UsedUnits() float64 { + if rl.UsageTTL != 0 { + rl.removeExpiredUnits() + } + return rl.usageCounter +} + +func (rl *ResourceLimit) RecordUsage(ru *ResourceUsage) error { + if _, hasID := rl.Usage[ru.ID]; hasID { + return fmt.Errorf("Duplicate resource usage with id: %s", ru.ID) + } + rl.Usage[ru.ID] = ru + rl.usageCounter += ru.UsageUnits + return nil +} + +func (rl *ResourceLimit) RemoveUsage(ruID string) error { + ru, hasIt := rl.Usage[ruID] + if !hasIt { + return fmt.Errorf("Cannot find usage record with id: %s", ruID) + } + delete(rl.Usage, ru.ID) + rl.usageCounter -= ru.UsageUnits + return nil } // Pas the config as a whole so we can ask access concurrently @@ -157,6 +201,58 @@ func (rls *ResourceLimiterService) Shutdown() error { // RPC Methods available internally +// Called when a session or another event needs to +func (rls *ResourceLimiterService) V1InitiateResourceUsage(attrs utils.AttrRLsResourceUsage, reply *string) error { + rls.Lock() // Unknown number of RLs updated + defer rls.Unlock() + matchingResources := make(map[string]*ResourceLimit) + for fldName, fieldValIf := range attrs.Event { + _, hasIt := rls.stringIndexes[fldName] + if !hasIt { + continue + } + strVal, canCast := utils.CastFieldIfToString(fieldValIf) + if !canCast { + return fmt.Errorf("Cannot cast field: %s into string", fldName) + } + if _, hasIt := rls.stringIndexes[fldName][strVal]; !hasIt { + continue + } + for resName := range rls.stringIndexes[fldName][strVal] { + if _, hasIt := matchingResources[resName]; hasIt { // Already checked this RL + continue + } + x, err := CacheGet(utils.ResourceLimitsPrefix + resName) + if err != nil { + return err + } + rl := x.(*ResourceLimit) + for _, fltr := range rl.Filters { + if pass, err := fltr.Pass(attrs, "", rls.cdrStatS); err != nil { + return utils.NewErrServerError(err) + } else if !pass { + continue + } + if rl.Limit < rl.UsedUnits()+attrs.RequestedUnits { + continue // Not offering any usage units + } + if err := rl.RecordUsage(&ResourceUsage{ID: attrs.ResourceUsageID, UsageTime: time.Now(), UsageUnits: attrs.RequestedUnits}); err != nil { + return err // Should not happen + } + matchingResources[rl.ID] = rl // Cannot save it here since we could have errors after and resource will remain unused + } + } + } + if len(matchingResources) == 0 { + return utils.ErrResourceUnavailable + } + for _, rl := range matchingResources { + CacheSet(utils.ResourceLimitsPrefix+rl.ID, rl) + } + *reply = utils.OK + return nil +} + // Cache/Re-cache func (rls *ResourceLimiterService) V1CacheResourceLimits(attrs *utils.AttrRLsCache, reply *string) error { if err := rls.cacheResourceLimits(attrs.LoadID, attrs.ResourceLimitIDs); err != nil { diff --git a/utils/apitpdata.go b/utils/apitpdata.go index 7ee5e75b6..7f382d6e4 100644 --- a/utils/apitpdata.go +++ b/utils/apitpdata.go @@ -1222,3 +1222,9 @@ type AttrRLsCache struct { LoadID string ResourceLimitIDs []string } + +type AttrRLsResourceUsage struct { + ResourceUsageID string + Event map[string]interface{} + RequestedUnits float64 +} diff --git a/utils/consts.go b/utils/consts.go index bae3ef681..b7d89a9bb 100644 --- a/utils/consts.go +++ b/utils/consts.go @@ -32,6 +32,7 @@ var ( ErrUserNotFound = errors.New("USER_NOT_FOUND") ErrInsufficientCredit = errors.New("INSUFFICIENT_CREDIT") ErrNotConvertible = errors.New("NOT_CONVERTIBLE") + ErrResourceUnavailable = errors.New("RESOURCE_UNAVAILABLE") CdreCdrFormats = []string{CSV, DRYRUN, CDRE_FIXED_WIDTH} PrimaryCdrFields = []string{CGRID, CDRSOURCE, CDRHOST, ACCID, TOR, REQTYPE, DIRECTION, TENANT, CATEGORY, ACCOUNT, SUBJECT, DESTINATION, SETUP_TIME, PDD, ANSWER_TIME, USAGE,