Files
cgrates/engine/reslimiter.go

294 lines
9.0 KiB
Go

/*
Real-time Online/Offline Charging System (OCS) for Telecom & ISP environments
Copyright (C) ITsysCOM GmbH
This program is free software: you can redistribute it and/or modify
it under the terms of the GNU General Public License as published by
the Free Software Foundation, either version 3 of the License, or
(at your option) any later version.
This program is distributed in the hope that it will be useful,
but WITHOUT ANY WARRANTY; without even the implied warranty of
MERCHANTABILITY or FITNESS FOR A PARTICULAR PURPOSE. See the
GNU General Public License for more details.
You should have received a copy of the GNU General Public License
along with this program. If not, see <http://www.gnu.org/licenses/>
*/
package engine
import (
"fmt"
"reflect"
"sort"
"sync"
"time"
"github.com/cgrates/cgrates/config"
"github.com/cgrates/cgrates/utils"
"github.com/cgrates/rpcclient"
)
type ResourceUsage struct {
ID string // Unique identifier of this resourceUsage, Eg: FreeSWITCH UUID
UsageTime time.Time // So we can expire it later
UsageUnits float64 // Number of units used
}
// ResourceLimit represents a limit imposed for accessing a resource (eg: new calls)
type ResourceLimit struct {
sync.Mutex
ID string // Identifier of this limit
Filters []*RequestFilter // Filters for the request
ActivationTime time.Time // Time when this limit becomes active
ExpiryTime time.Time
Weight float64 // Weight to sort the ResourceLimits
Limit float64 // Limit value
ActionTriggers ActionTriggers // Thresholds to check after changing Limit
UsageTTL time.Duration // Expire usage after this duration
Usage map[string]*ResourceUsage // Keep a record of usage, bounded with timestamps so we can expire too long records
usageCounter float64 // internal counter aggregating real usage of ResourceLimit
}
func (rl *ResourceLimit) removeExpiredUnits() {
for ruID, rv := range rl.Usage {
if time.Now().Sub(rv.UsageTime) <= rl.UsageTTL {
continue // not expired
}
delete(rl.Usage, ruID)
rl.usageCounter -= rv.UsageUnits
}
}
func (rl *ResourceLimit) UsedUnits() float64 {
rl.Lock()
defer rl.Unlock()
if rl.UsageTTL != 0 {
rl.removeExpiredUnits()
}
return rl.usageCounter
}
func (rl *ResourceLimit) RecordUsage(ru *ResourceUsage) (err error) {
rl.Lock()
defer rl.Unlock()
if _, hasID := rl.Usage[ru.ID]; hasID {
return fmt.Errorf("Duplicate resource usage with id: %s", ru.ID)
}
rl.Usage[ru.ID] = ru
rl.usageCounter += ru.UsageUnits
return
}
func (rl *ResourceLimit) ClearUsage(ruID string) error {
rl.Lock()
defer rl.Unlock()
ru, hasIt := rl.Usage[ruID]
if !hasIt {
return fmt.Errorf("Cannot find usage record with id: %s", ruID)
}
delete(rl.Usage, ru.ID)
rl.usageCounter -= ru.UsageUnits
return nil
}
// ResourceLimits is an ordered list of ResourceLimits based on Weight
type ResourceLimits []*ResourceLimit
// sort based on Weight
func (rls ResourceLimits) Sort() {
sort.Slice(rls, func(i, j int) bool { return rls[i].Weight > rls[j].Weight })
}
// AllowUsage checks limits and decides whether the usage is allowed
func (rls ResourceLimits) AllowUsage(usage float64) bool {
if len(rls) != 0 { // if rules defined, they need to allow usage
for _, rl := range rls {
if rl.Limit < rl.UsedUnits()+usage {
return false
}
}
}
return true
}
// RecordUsage will record the usage in all the resource limits
func (rls ResourceLimits) RecordUsage(ru *ResourceUsage) (err error) {
var failedAtIdx int
for i, rl := range rls {
if err = rl.RecordUsage(ru); err != nil {
failedAtIdx = i
break
}
}
if err != nil {
for _, rl := range rls[:failedAtIdx] {
rl.ClearUsage(ru.ID) // best effort
}
}
return
}
// ClearUsage gives back the units to the pool
func (rls ResourceLimits) ClearUsage(ruID string) {
for _, rl := range rls {
if err := rl.ClearUsage(ruID); err != nil {
utils.Logger.Warning(fmt.Sprintf("<ResourceLimits>, err: %s", err.Error()))
}
}
return
}
// Pas the config as a whole so we can ask access concurrently
func NewResourceLimiterService(cfg *config.CGRConfig, dataDB DataDB, cdrStatS rpcclient.RpcClientConnection) (*ResourceLimiterService, error) {
if cdrStatS != nil && reflect.ValueOf(cdrStatS).IsNil() {
cdrStatS = nil
}
return &ResourceLimiterService{dataDB: dataDB, cdrStatS: cdrStatS}, nil
}
// ResourcesLimiter is the service handling channel limits
type ResourceLimiterService struct {
dataDB DataDB // So we can load the data in cache and index it
cdrStatS rpcclient.RpcClientConnection
}
// Called to start the service
func (rls *ResourceLimiterService) ListenAndServe() error {
return nil
}
// Called to shutdown the service
func (rls *ResourceLimiterService) ServiceShutdown() error {
return nil
}
// matchingResourceLimitsForEvent returns ordered list of matching resources which are active by the time of the call
func (rls *ResourceLimiterService) matchingResourceLimitsForEvent(ev map[string]interface{}) (resLimits ResourceLimits, err error) {
matchingResources := make(map[string]*ResourceLimit)
for fldName, fieldValIf := range ev {
fldVal, canCast := utils.CastFieldIfToString(fieldValIf)
if !canCast {
return nil, fmt.Errorf("Cannot cast field: %s into string", fldName)
}
rlIDs, err := rls.dataDB.MatchReqFilterIndex(utils.ResourceLimitsIndex, utils.ConcatenatedKey(fldName, fldVal))
if err != nil {
if err == utils.ErrNotFound {
continue
}
return nil, err
}
for resName := range rlIDs {
if _, hasIt := matchingResources[resName]; hasIt { // Already checked this RL
continue
}
rl, err := rls.dataDB.GetResourceLimit(resName, false, utils.NonTransactional)
if err != nil {
if err == utils.ErrNotFound {
continue
}
return nil, err
}
now := time.Now()
if rl.ActivationTime.After(now) || (!rl.ExpiryTime.IsZero() && rl.ExpiryTime.Before(now)) { // not active
continue
}
passAllFilters := true
for _, fltr := range rl.Filters {
if pass, err := fltr.Pass(ev, "", rls.cdrStatS); err != nil {
return nil, utils.NewErrServerError(err)
} else if !pass {
passAllFilters = false
continue
}
}
if passAllFilters {
matchingResources[rl.ID] = rl // Cannot save it here since we could have errors after and resource will remain unused
}
}
}
// Check un-indexed resources
uIdxRLIDs, err := rls.dataDB.MatchReqFilterIndex(utils.ResourceLimitsIndex, utils.ConcatenatedKey(utils.NOT_AVAILABLE, utils.NOT_AVAILABLE))
if err != nil && err != utils.ErrNotFound {
return nil, err
}
for resName := range uIdxRLIDs {
if _, hasIt := matchingResources[resName]; hasIt { // Already checked this RL
continue
}
rl, err := rls.dataDB.GetResourceLimit(resName, false, utils.NonTransactional)
if err != nil {
if err == utils.ErrNotFound {
continue
}
return nil, err
}
now := time.Now()
if rl.ActivationTime.After(now) || (!rl.ExpiryTime.IsZero() && rl.ExpiryTime.Before(now)) { // not active
continue
}
for _, fltr := range rl.Filters {
if pass, err := fltr.Pass(ev, "", rls.cdrStatS); err != nil {
return nil, utils.NewErrServerError(err)
} else if !pass {
continue
}
matchingResources[rl.ID] = rl // Cannot save it here since we could have errors after and resource will remain unused
}
}
resLimits = make(ResourceLimits, len(matchingResources))
i := 0
for _, rl := range matchingResources {
resLimits[i] = rl
i++
}
resLimits.Sort()
return resLimits, nil
}
// V1ResourceLimitsForEvent returns active resource limits matching the event
func (rls *ResourceLimiterService) V1ResourceLimitsForEvent(ev map[string]interface{}, reply *[]*ResourceLimit) error {
matchingRLForEv, err := rls.matchingResourceLimitsForEvent(ev)
if err != nil {
return utils.NewErrServerError(err)
}
*reply = matchingRLForEv
return nil
}
func (rls *ResourceLimiterService) V1AllowUsage(attrs utils.AttrRLsResourceUsage, allow *bool) (err error) {
matchingRLForEv, err := rls.matchingResourceLimitsForEvent(attrs.Event)
if err != nil {
return utils.NewErrServerError(err)
}
*allow = matchingRLForEv.AllowUsage(attrs.Units)
return
}
// V1InitiateResourceUsage is called when a session or another event needs to consume
func (rls *ResourceLimiterService) V1AllocateResource(args utils.AttrRLsResourceUsage, reply *string) (err error) {
mtcRLs, err := rls.matchingResourceLimitsForEvent(args.Event)
if err != nil {
return utils.NewErrServerError(err)
}
if !mtcRLs.AllowUsage(args.Units) {
return utils.ErrResourceUnavailable
}
if err = mtcRLs.RecordUsage(&ResourceUsage{ID: args.UsageID,
UsageTime: time.Now(), UsageUnits: args.Units}); err != nil {
return
}
*reply = utils.OK
return
}
func (rls *ResourceLimiterService) V1ReleaseResource(attrs utils.AttrRLsResourceUsage, reply *string) (err error) {
mtcRLs, err := rls.matchingResourceLimitsForEvent(attrs.Event)
if err != nil {
return utils.NewErrServerError(err)
}
mtcRLs.ClearUsage(attrs.UsageID)
*reply = utils.OK
return nil
}