mirror of
https://github.com/cgrates/cgrates.git
synced 2026-02-15 13:19:53 +05:00
ResourceLimits with standalone RPC object, renaming of RPC methods
This commit is contained in:
@@ -75,7 +75,7 @@ type RequestFilter struct {
|
||||
func (rf *RequestFilter) CompileValues() (err error) {
|
||||
if rf.Type == MetaRSRFields {
|
||||
if rf.rsrFields, err = utils.ParseRSRFieldsFromSlice(rf.Values); err != nil {
|
||||
return err
|
||||
return
|
||||
}
|
||||
} else if rf.Type == MetaCDRStats {
|
||||
rf.cdrStatSThresholds = make([]*RFStatSThreshold, len(rf.Values))
|
||||
@@ -98,7 +98,7 @@ func (rf *RequestFilter) CompileValues() (err error) {
|
||||
rf.cdrStatSThresholds[i] = st
|
||||
}
|
||||
}
|
||||
return nil
|
||||
return
|
||||
}
|
||||
|
||||
// Pass is the method which should be used from outside.
|
||||
|
||||
@@ -20,11 +20,10 @@ package engine
|
||||
import (
|
||||
"fmt"
|
||||
"reflect"
|
||||
"strings"
|
||||
"sort"
|
||||
"sync"
|
||||
"time"
|
||||
|
||||
"github.com/cgrates/cgrates/cache"
|
||||
"github.com/cgrates/cgrates/config"
|
||||
"github.com/cgrates/cgrates/utils"
|
||||
"github.com/cgrates/rpcclient"
|
||||
@@ -38,6 +37,7 @@ type ResourceUsage struct {
|
||||
|
||||
// ResourceLimit represents a limit imposed for accessing a resource (eg: new calls)
|
||||
type ResourceLimit struct {
|
||||
sync.Mutex
|
||||
ID string // Identifier of this limit
|
||||
Filters []*RequestFilter // Filters for the request
|
||||
ActivationTime time.Time // Time when this limit becomes active
|
||||
@@ -61,22 +61,28 @@ func (rl *ResourceLimit) removeExpiredUnits() {
|
||||
}
|
||||
|
||||
func (rl *ResourceLimit) UsedUnits() float64 {
|
||||
rl.Lock()
|
||||
defer rl.Unlock()
|
||||
if rl.UsageTTL != 0 {
|
||||
rl.removeExpiredUnits()
|
||||
}
|
||||
return rl.usageCounter
|
||||
}
|
||||
|
||||
func (rl *ResourceLimit) RecordUsage(ru *ResourceUsage) error {
|
||||
func (rl *ResourceLimit) RecordUsage(ru *ResourceUsage) (err error) {
|
||||
rl.Lock()
|
||||
defer rl.Unlock()
|
||||
if _, hasID := rl.Usage[ru.ID]; hasID {
|
||||
return fmt.Errorf("Duplicate resource usage with id: %s", ru.ID)
|
||||
}
|
||||
rl.Usage[ru.ID] = ru
|
||||
rl.usageCounter += ru.UsageUnits
|
||||
return nil
|
||||
return
|
||||
}
|
||||
|
||||
func (rl *ResourceLimit) RemoveUsage(ruID string) error {
|
||||
func (rl *ResourceLimit) ClearUsage(ruID string) error {
|
||||
rl.Lock()
|
||||
defer rl.Unlock()
|
||||
ru, hasIt := rl.Usage[ruID]
|
||||
if !hasIt {
|
||||
return fmt.Errorf("Cannot find usage record with id: %s", ruID)
|
||||
@@ -86,23 +92,79 @@ func (rl *ResourceLimit) RemoveUsage(ruID string) error {
|
||||
return nil
|
||||
}
|
||||
|
||||
// ResourceLimits is an ordered list of ResourceLimits based on Weight
|
||||
type ResourceLimits []*ResourceLimit
|
||||
|
||||
// sort based on Weight
|
||||
func (rls ResourceLimits) Sort() {
|
||||
sort.Slice(rls, func(i, j int) bool { return rls[i].Weight > rls[j].Weight })
|
||||
}
|
||||
|
||||
// AllowUsage checks limits and decides whether the usage is allowed
|
||||
func (rls ResourceLimits) AllowUsage(usage float64) bool {
|
||||
if len(rls) != 0 { // if rules defined, they need to allow usage
|
||||
for _, rl := range rls {
|
||||
if rl.Limit < rl.UsedUnits()+usage {
|
||||
return false
|
||||
}
|
||||
}
|
||||
}
|
||||
return true
|
||||
}
|
||||
|
||||
// RecordUsage will record the usage in all the resource limits
|
||||
func (rls ResourceLimits) RecordUsage(ru *ResourceUsage) (err error) {
|
||||
var failedAtIdx int
|
||||
for i, rl := range rls {
|
||||
if err = rl.RecordUsage(ru); err != nil {
|
||||
failedAtIdx = i
|
||||
break
|
||||
}
|
||||
}
|
||||
if err != nil {
|
||||
for _, rl := range rls[:failedAtIdx] {
|
||||
rl.ClearUsage(ru.ID) // best effort
|
||||
}
|
||||
}
|
||||
return
|
||||
}
|
||||
|
||||
// ClearUsage gives back the units to the pool
|
||||
func (rls ResourceLimits) ClearUsage(ruID string) {
|
||||
for _, rl := range rls {
|
||||
if err := rl.ClearUsage(ruID); err != nil {
|
||||
utils.Logger.Warning(fmt.Sprintf("<ResourceLimits>, err: %s", err.Error()))
|
||||
}
|
||||
}
|
||||
return
|
||||
}
|
||||
|
||||
// Pas the config as a whole so we can ask access concurrently
|
||||
func NewResourceLimiterService(cfg *config.CGRConfig, dataDB DataDB, cdrStatS rpcclient.RpcClientConnection) (*ResourceLimiterService, error) {
|
||||
if cdrStatS != nil && reflect.ValueOf(cdrStatS).IsNil() {
|
||||
cdrStatS = nil
|
||||
}
|
||||
rls := &ResourceLimiterService{dataDB: dataDB, cdrStatS: cdrStatS}
|
||||
return rls, nil
|
||||
return &ResourceLimiterService{dataDB: dataDB, cdrStatS: cdrStatS}, nil
|
||||
}
|
||||
|
||||
// ResourcesLimiter is the service handling channel limits
|
||||
type ResourceLimiterService struct {
|
||||
sync.RWMutex
|
||||
dataDB DataDB // So we can load the data in cache and index it
|
||||
cdrStatS rpcclient.RpcClientConnection
|
||||
}
|
||||
|
||||
func (rls *ResourceLimiterService) matchingResourceLimitsForEvent(ev map[string]interface{}) (map[string]*ResourceLimit, error) {
|
||||
// Called to start the service
|
||||
func (rls *ResourceLimiterService) ListenAndServe() error {
|
||||
return nil
|
||||
}
|
||||
|
||||
// Called to shutdown the service
|
||||
func (rls *ResourceLimiterService) ServiceShutdown() error {
|
||||
return nil
|
||||
}
|
||||
|
||||
// matchingResourceLimitsForEvent returns ordered list of matching resources which are active by the time of the call
|
||||
func (rls *ResourceLimiterService) matchingResourceLimitsForEvent(ev map[string]interface{}) (resLimits ResourceLimits, err error) {
|
||||
matchingResources := make(map[string]*ResourceLimit)
|
||||
for fldName, fieldValIf := range ev {
|
||||
fldVal, canCast := utils.CastFieldIfToString(fieldValIf)
|
||||
@@ -174,117 +236,58 @@ func (rls *ResourceLimiterService) matchingResourceLimitsForEvent(ev map[string]
|
||||
matchingResources[rl.ID] = rl // Cannot save it here since we could have errors after and resource will remain unused
|
||||
}
|
||||
}
|
||||
return matchingResources, nil
|
||||
resLimits = make(ResourceLimits, len(matchingResources))
|
||||
i := 0
|
||||
for _, rl := range matchingResources {
|
||||
resLimits[i] = rl
|
||||
i++
|
||||
}
|
||||
resLimits.Sort()
|
||||
return resLimits, nil
|
||||
}
|
||||
|
||||
// Called to start the service
|
||||
func (rls *ResourceLimiterService) ListenAndServe() error {
|
||||
return nil
|
||||
}
|
||||
|
||||
// Called to shutdown the service
|
||||
func (rls *ResourceLimiterService) ServiceShutdown() error {
|
||||
return nil
|
||||
}
|
||||
|
||||
// RPC Methods
|
||||
|
||||
// V1ResourceLimitsForEvent returns active resource limits matching the event
|
||||
func (rls *ResourceLimiterService) V1ResourceLimitsForEvent(ev map[string]interface{}, reply *[]*ResourceLimit) error {
|
||||
rls.Lock() // Unknown number of RLs updated
|
||||
defer rls.Unlock()
|
||||
matchingRLForEv, err := rls.matchingResourceLimitsForEvent(ev)
|
||||
if err != nil {
|
||||
return utils.NewErrServerError(err)
|
||||
}
|
||||
retRLs := make([]*ResourceLimit, len(matchingRLForEv))
|
||||
i := 0
|
||||
for _, rl := range matchingRLForEv {
|
||||
retRLs[i] = rl
|
||||
i++
|
||||
}
|
||||
*reply = retRLs
|
||||
*reply = matchingRLForEv
|
||||
return nil
|
||||
}
|
||||
|
||||
// Alias API for external use
|
||||
func (rls *ResourceLimiterService) ResourceLimitsForEvent(ev map[string]interface{}, reply *[]*ResourceLimit) error {
|
||||
return rls.V1ResourceLimitsForEvent(ev, reply)
|
||||
}
|
||||
|
||||
// Called when a session or another event needs to
|
||||
func (rls *ResourceLimiterService) V1InitiateResourceUsage(attrs utils.AttrRLsResourceUsage, reply *string) error {
|
||||
rls.Lock() // Unknown number of RLs updated
|
||||
defer rls.Unlock()
|
||||
func (rls *ResourceLimiterService) V1AllowUsage(attrs utils.AttrRLsResourceUsage, allow *bool) (err error) {
|
||||
matchingRLForEv, err := rls.matchingResourceLimitsForEvent(attrs.Event)
|
||||
if err != nil {
|
||||
return utils.NewErrServerError(err)
|
||||
}
|
||||
for rlID, rl := range matchingRLForEv {
|
||||
if rl.Limit < rl.UsedUnits()+attrs.RequestedUnits {
|
||||
delete(matchingRLForEv, rlID)
|
||||
}
|
||||
if err := rl.RecordUsage(&ResourceUsage{ID: attrs.ResourceUsageID, UsageTime: time.Now(), UsageUnits: attrs.RequestedUnits}); err != nil {
|
||||
return err // Should not happen
|
||||
}
|
||||
*allow = matchingRLForEv.AllowUsage(attrs.Units)
|
||||
return
|
||||
}
|
||||
|
||||
// V1InitiateResourceUsage is called when a session or another event needs to consume
|
||||
func (rls *ResourceLimiterService) V1AllocateResource(args utils.AttrRLsResourceUsage, reply *string) (err error) {
|
||||
mtcRLs, err := rls.matchingResourceLimitsForEvent(args.Event)
|
||||
if err != nil {
|
||||
return utils.NewErrServerError(err)
|
||||
}
|
||||
if len(matchingRLForEv) == 0 {
|
||||
if !mtcRLs.AllowUsage(args.Units) {
|
||||
return utils.ErrResourceUnavailable
|
||||
}
|
||||
for _, rl := range matchingRLForEv {
|
||||
cache.Set(utils.ResourceLimitsPrefix+rl.ID, rl, true, "") // no real reason for a transaction
|
||||
if err = mtcRLs.RecordUsage(&ResourceUsage{ID: args.UsageID,
|
||||
UsageTime: time.Now(), UsageUnits: args.Units}); err != nil {
|
||||
return
|
||||
}
|
||||
*reply = utils.OK
|
||||
return nil
|
||||
return
|
||||
}
|
||||
|
||||
// Alias for externam methods
|
||||
func (rls *ResourceLimiterService) InitiateResourceUsage(attrs utils.AttrRLsResourceUsage, reply *string) error {
|
||||
return rls.V1InitiateResourceUsage(attrs, reply)
|
||||
}
|
||||
|
||||
func (rls *ResourceLimiterService) V1TerminateResourceUsage(attrs utils.AttrRLsResourceUsage, reply *string) error {
|
||||
rls.Lock() // Unknown number of RLs updated
|
||||
defer rls.Unlock()
|
||||
matchingRLForEv, err := rls.matchingResourceLimitsForEvent(attrs.Event)
|
||||
func (rls *ResourceLimiterService) V1ReleaseResource(attrs utils.AttrRLsResourceUsage, reply *string) (err error) {
|
||||
mtcRLs, err := rls.matchingResourceLimitsForEvent(attrs.Event)
|
||||
if err != nil {
|
||||
return utils.NewErrServerError(err)
|
||||
}
|
||||
for _, rl := range matchingRLForEv {
|
||||
rl.RemoveUsage(attrs.ResourceUsageID)
|
||||
}
|
||||
mtcRLs.ClearUsage(attrs.UsageID)
|
||||
*reply = utils.OK
|
||||
return nil
|
||||
}
|
||||
|
||||
// Alias for external methods
|
||||
func (rls *ResourceLimiterService) TerminateResourceUsage(attrs utils.AttrRLsResourceUsage, reply *string) error {
|
||||
return rls.V1TerminateResourceUsage(attrs, reply)
|
||||
}
|
||||
|
||||
// Make the service available as RPC internally
|
||||
func (rls *ResourceLimiterService) Call(serviceMethod string, args interface{}, reply interface{}) error {
|
||||
parts := strings.Split(serviceMethod, ".")
|
||||
if len(parts) != 2 {
|
||||
return utils.ErrNotImplemented
|
||||
}
|
||||
// get method
|
||||
method := reflect.ValueOf(rls).MethodByName(parts[0][len(parts[0])-2:] + parts[1]) // Inherit the version in the method
|
||||
if !method.IsValid() {
|
||||
return utils.ErrNotImplemented
|
||||
}
|
||||
|
||||
// construct the params
|
||||
params := []reflect.Value{reflect.ValueOf(args), reflect.ValueOf(reply)}
|
||||
ret := method.Call(params)
|
||||
if len(ret) != 1 {
|
||||
return utils.ErrServerError
|
||||
}
|
||||
if ret[0].Interface() == nil {
|
||||
return nil
|
||||
}
|
||||
err, ok := ret[0].Interface().(error)
|
||||
if !ok {
|
||||
return utils.ErrServerError
|
||||
}
|
||||
return err
|
||||
}
|
||||
|
||||
@@ -18,6 +18,7 @@ along with this program. If not, see <http://www.gnu.org/licenses/>
|
||||
package engine
|
||||
|
||||
import (
|
||||
"reflect"
|
||||
"testing"
|
||||
"time"
|
||||
|
||||
@@ -259,8 +260,8 @@ func TestRLsLoadRLs(t *testing.T) {
|
||||
|
||||
func TestRLsMatchingResourceLimitsForEvent(t *testing.T) {
|
||||
rLS = &ResourceLimiterService{dataDB: dataStorage, cdrStatS: nil}
|
||||
eResLimits := map[string]*ResourceLimit{
|
||||
"RL1": &ResourceLimit{
|
||||
eResLimits := ResourceLimits{
|
||||
&ResourceLimit{
|
||||
ID: "RL1",
|
||||
Weight: 20,
|
||||
Filters: []*RequestFilter{
|
||||
@@ -272,7 +273,7 @@ func TestRLsMatchingResourceLimitsForEvent(t *testing.T) {
|
||||
Limit: 2,
|
||||
Usage: make(map[string]*ResourceUsage),
|
||||
},
|
||||
"RL2": &ResourceLimit{
|
||||
&ResourceLimit{
|
||||
ID: "RL2",
|
||||
Weight: 10,
|
||||
Filters: []*RequestFilter{
|
||||
@@ -287,23 +288,17 @@ func TestRLsMatchingResourceLimitsForEvent(t *testing.T) {
|
||||
}
|
||||
if resLimits, err := rLS.matchingResourceLimitsForEvent(map[string]interface{}{"Account": "1002", "Subject": "dan", "Destination": "1002"}); err != nil {
|
||||
t.Error(err)
|
||||
} else if len(eResLimits) != len(resLimits) {
|
||||
} else if !reflect.DeepEqual(eResLimits[0].Filters[0], resLimits[0].Filters[0]) {
|
||||
t.Errorf("Expecting: %+v, received: %+v", eResLimits, resLimits)
|
||||
} else {
|
||||
for rlID := range eResLimits {
|
||||
if _, hasID := resLimits[rlID]; !hasID {
|
||||
t.Errorf("Expecting: %+v, received: %+v", eResLimits, resLimits)
|
||||
}
|
||||
}
|
||||
// Make sure the filters are what we expect to be after retrieving from cache:
|
||||
fltr := resLimits["RL1"].Filters[1]
|
||||
fltr := resLimits[0].Filters[1]
|
||||
if pass, _ := fltr.Pass(map[string]interface{}{"Subject": "10000001"}, "", nil); !pass {
|
||||
t.Errorf("Expecting RL: %+v, received: %+v", eResLimits["RL1"], resLimits["RL1"])
|
||||
t.Errorf("Expecting RL: %+v, received: %+v", eResLimits[0], resLimits[0])
|
||||
}
|
||||
if pass, _ := fltr.Pass(map[string]interface{}{"Account": "1002"}, "", nil); pass {
|
||||
t.Errorf("Expecting RL: %+v, received: %+v", eResLimits["RL1"], resLimits["RL1"])
|
||||
t.Errorf("Expecting RL: %+v, received: %+v", eResLimits[0], resLimits[0])
|
||||
}
|
||||
|
||||
}
|
||||
}
|
||||
|
||||
@@ -344,12 +339,12 @@ func TestRLsV1ResourceLimitsForEvent(t *testing.T) {
|
||||
|
||||
func TestRLsV1InitiateResourceUsage(t *testing.T) {
|
||||
attrRU := utils.AttrRLsResourceUsage{
|
||||
ResourceUsageID: "651a8db2-4f67-4cf8-b622-169e8a482e50",
|
||||
Event: map[string]interface{}{"Account": "1002", "Subject": "dan", "Destination": "1002"},
|
||||
RequestedUnits: 1,
|
||||
UsageID: "651a8db2-4f67-4cf8-b622-169e8a482e50",
|
||||
Event: map[string]interface{}{"Account": "1002", "Subject": "dan", "Destination": "1002"},
|
||||
Units: 1,
|
||||
}
|
||||
var reply string
|
||||
if err := rLS.V1InitiateResourceUsage(attrRU, &reply); err != nil {
|
||||
if err := rLS.V1AllocateResource(attrRU, &reply); err != nil {
|
||||
t.Error(err)
|
||||
} else if reply != utils.OK {
|
||||
t.Error("Received reply: ", reply)
|
||||
@@ -359,21 +354,21 @@ func TestRLsV1InitiateResourceUsage(t *testing.T) {
|
||||
t.Error(err)
|
||||
} else if len(resLimits) != 2 {
|
||||
t.Errorf("Received: %+v", resLimits)
|
||||
} else if resLimits["RL1"].UsedUnits() != 1 {
|
||||
t.Errorf("RL1: %+v", resLimits["RL1"])
|
||||
} else if _, hasKey := resLimits["RL1"].Usage[attrRU.ResourceUsageID]; !hasKey {
|
||||
t.Errorf("RL1: %+v", resLimits["RL1"])
|
||||
} else if resLimits[0].UsedUnits() != 1 {
|
||||
t.Errorf("RL1: %+v", resLimits[0])
|
||||
} else if _, hasKey := resLimits[0].Usage[attrRU.UsageID]; !hasKey {
|
||||
t.Errorf("RL1: %+v", resLimits[0])
|
||||
}
|
||||
}
|
||||
|
||||
func TestRLsV1TerminateResourceUsage(t *testing.T) {
|
||||
attrRU := utils.AttrRLsResourceUsage{
|
||||
ResourceUsageID: "651a8db2-4f67-4cf8-b622-169e8a482e50",
|
||||
Event: map[string]interface{}{"Account": "1002", "Subject": "dan", "Destination": "1002"},
|
||||
RequestedUnits: 1,
|
||||
UsageID: "651a8db2-4f67-4cf8-b622-169e8a482e50",
|
||||
Event: map[string]interface{}{"Account": "1002", "Subject": "dan", "Destination": "1002"},
|
||||
Units: 1,
|
||||
}
|
||||
var reply string
|
||||
if err := rLS.V1TerminateResourceUsage(attrRU, &reply); err != nil {
|
||||
if err := rLS.V1ReleaseResource(attrRU, &reply); err != nil {
|
||||
t.Error(err)
|
||||
} else if reply != utils.OK {
|
||||
t.Error("Received reply: ", reply)
|
||||
@@ -383,9 +378,9 @@ func TestRLsV1TerminateResourceUsage(t *testing.T) {
|
||||
t.Error(err)
|
||||
} else if len(resLimits) != 2 {
|
||||
t.Errorf("Received: %+v", resLimits)
|
||||
} else if resLimits["RL1"].UsedUnits() != 0 {
|
||||
t.Errorf("RL1: %+v", resLimits["RL1"])
|
||||
} else if _, hasKey := resLimits["RL1"].Usage[attrRU.ResourceUsageID]; hasKey {
|
||||
t.Errorf("RL1: %+v", resLimits["RL1"])
|
||||
} else if resLimits[0].UsedUnits() != 0 {
|
||||
t.Errorf("RL1: %+v", resLimits[0])
|
||||
} else if _, hasKey := resLimits[0].Usage[attrRU.UsageID]; hasKey {
|
||||
t.Errorf("RL1: %+v", resLimits[0])
|
||||
}
|
||||
}
|
||||
|
||||
@@ -56,6 +56,7 @@ var fileHandlers = map[string]func(*TPCSVImporter, string) error{
|
||||
utils.LCRS_CSV: (*TPCSVImporter).importLcrs,
|
||||
utils.USERS_CSV: (*TPCSVImporter).importUsers,
|
||||
utils.ALIASES_CSV: (*TPCSVImporter).importAliases,
|
||||
utils.ResourceLimitsCsv: (*TPCSVImporter).importResourceLimits,
|
||||
}
|
||||
|
||||
func (self *TPCSVImporter) Run() error {
|
||||
@@ -345,3 +346,14 @@ func (self *TPCSVImporter) importAliases(fn string) error {
|
||||
}
|
||||
return self.StorDb.SetTPAliases(tps)
|
||||
}
|
||||
|
||||
func (self *TPCSVImporter) importResourceLimits(fn string) error {
|
||||
if self.Verbose {
|
||||
log.Printf("Processing file: <%s> ", fn)
|
||||
}
|
||||
rls, err := self.csvr.GetTPResourceLimits(self.TPid, "")
|
||||
if err != nil {
|
||||
return err
|
||||
}
|
||||
return self.StorDb.SetTPResourceLimits(rls)
|
||||
}
|
||||
|
||||
Reference in New Issue
Block a user