diff --git a/agents/radagent.go b/agents/radagent.go
index 2953fc7f8..59e3cb628 100644
--- a/agents/radagent.go
+++ b/agents/radagent.go
@@ -59,8 +59,6 @@ func (ra *RadiusAgent) handleAuth(req *radigo.Packet) (rpl *radigo.Packet, err e
return
}
-// RadiusAgent handleAcct, received req: &{RWMutex:{w:{state:0 sema:0} writerSem:0 readerSem:0 readerCount:0 readerWait:0} dict:0xc4202e5840 secret:CGRateS.org Code:AccountingRequest Identifier:143 Authenticator:[67 77 204 122 189 209 219 22 9 176 15 228 24 246 183 7] AVPs:[0xc42023c230 0xc42023c2a0 0xc42023c310 0xc42023c460 0xc42023c4d0 0xc42023c540 0xc42023c850 0xc42023ce00 0xc42023d180 0xc42023d1f0 0xc42023d260]}
-// Identifier:144 Authenticator:[192 197 33 53 203 181 16 117 204 143 172 174 231 245 81 116] AVPs:[0xc42023d5e0 0xc42023d650 0xc42023d880 0xc42023d8f0 0xc42023da40 0xc42023db20 0xc42023dc70 0xc42023dd50 0xc42023ddc0 0xc42023de30 0xc42023dea0]}
func (ra *RadiusAgent) handleAcct(req *radigo.Packet) (rpl *radigo.Packet, err error) {
req.SetAVPValues()
utils.Logger.Debug(fmt.Sprintf("Received request: %s", utils.ToJSON(req)))
diff --git a/apier/v1/rlsv1.go b/apier/v1/rlsv1.go
new file mode 100644
index 000000000..4b6854c30
--- /dev/null
+++ b/apier/v1/rlsv1.go
@@ -0,0 +1,81 @@
+/*
+Real-time Online/Offline Charging System (OCS) for Telecom & ISP environments
+Copyright (C) ITsysCOM GmbH
+
+This program is free software: you can redistribute it and/or modify
+it under the terms of the GNU General Public License as published by
+the Free Software Foundation, either version 3 of the License, or
+(at your option) any later version.
+
+This program is distributed in the hope that it will be useful,
+but WITHOUT ANY WARRANTY; without even the implied warranty of
+MERCHANTABILITY or FITNESS FOR A PARTICULAR PURPOSE. See the
+GNU General Public License for more details.
+
+You should have received a copy of the GNU General Public License
+along with this program. If not, see
+*/
+package v1
+
+import (
+ "reflect"
+ "strings"
+
+ "github.com/cgrates/cgrates/engine"
+ "github.com/cgrates/cgrates/utils"
+ "github.com/cgrates/rpcclient"
+)
+
+func NewRLsV1(rls *engine.ResourceLimiterService) *RLsV1 {
+ return &RLsV1{rls: rls}
+}
+
+// Exports RPC from RLs
+type RLsV1 struct {
+ rls *engine.ResourceLimiterService
+}
+
+// Call implements rpcclient.RpcClientConnection interface for internal RPC
+func (rlsv1 *RLsV1) Call(serviceMethod string, args interface{}, reply interface{}) error {
+ methodSplit := strings.Split(serviceMethod, ".")
+ if len(methodSplit) != 2 {
+ return rpcclient.ErrUnsupporteServiceMethod
+ }
+ method := reflect.ValueOf(rlsv1).MethodByName(methodSplit[1])
+ if !method.IsValid() {
+ return rpcclient.ErrUnsupporteServiceMethod
+ }
+ params := []reflect.Value{reflect.ValueOf(args), reflect.ValueOf(reply)}
+ ret := method.Call(params)
+ if len(ret) != 1 {
+ return utils.ErrServerError
+ }
+ if ret[0].Interface() == nil {
+ return nil
+ }
+ err, ok := ret[0].Interface().(error)
+ if !ok {
+ return utils.ErrServerError
+ }
+ return err
+}
+
+// GetLimitsForEvent returns ResourceLimits matching a specific event
+func (rlsv1 *RLsV1) GetLimitsForEvent(ev map[string]interface{}, reply *[]*engine.ResourceLimit) error {
+ return rlsv1.rls.V1ResourceLimitsForEvent(ev, reply)
+}
+
+// AllowUsage checks if there are limits imposed for event
+func (rlsv1 *RLsV1) AllowUsage(args utils.AttrRLsResourceUsage, has *bool) error {
+ return rlsv1.rls.V1AllowUsage(args, has)
+}
+
+// V1InitiateResourceUsage records usage for an event
+func (rlsv1 *RLsV1) AllocateResource(args utils.AttrRLsResourceUsage, reply *string) error {
+ return rlsv1.rls.V1AllocateResource(args, reply)
+}
+
+// V1TerminateResourceUsage releases usage for an event
+func (rlsv1 *RLsV1) ReleaseResource(args utils.AttrRLsResourceUsage, reply *string) error {
+ return rlsv1.rls.V1ReleaseResource(args, reply)
+}
diff --git a/cmd/cgr-engine/cgr-engine.go b/cmd/cgr-engine/cgr-engine.go
index 5a0e77e42..f9c6d8f94 100644
--- a/cmd/cgr-engine/cgr-engine.go
+++ b/cmd/cgr-engine/cgr-engine.go
@@ -531,8 +531,9 @@ func startResourceLimiterService(internalRLSChan, internalCdrStatSChan chan rpcc
exitChan <- true
return
}
- server.RpcRegisterName("RLsV1", rls)
- internalRLSChan <- rls
+ rlsV1 := v1.NewRLsV1(rls)
+ server.RpcRegister(rlsV1)
+ internalRLSChan <- rlsV1
}
func startRpc(server *utils.Server, internalRaterChan,
diff --git a/data/storage/mysql/create_tariffplan_tables.sql b/data/storage/mysql/create_tariffplan_tables.sql
index 8b12c4e60..9c9ba3318 100644
--- a/data/storage/mysql/create_tariffplan_tables.sql
+++ b/data/storage/mysql/create_tariffplan_tables.sql
@@ -406,7 +406,7 @@ CREATE TABLE tp_resource_limits (
`created_at` TIMESTAMP,
PRIMARY KEY (`id`),
KEY `tpid` (`tpid`),
- UNIQUE KEY `unique_tp_resource_limits` (`tpid`, `tag`)
+ UNIQUE KEY `unique_tp_resource_limits` (`tpid`, `tag`, `filter_type`, `filter_field_name`)
);
DROP TABLE IF EXISTS versions;
diff --git a/data/storage/postgres/create_tariffplan_tables.sql b/data/storage/postgres/create_tariffplan_tables.sql
index b4192d1cc..e6ce162a5 100644
--- a/data/storage/postgres/create_tariffplan_tables.sql
+++ b/data/storage/postgres/create_tariffplan_tables.sql
@@ -401,7 +401,7 @@ CREATE TABLE tp_resource_limits (
"created_at" TIMESTAMP WITH TIME ZONE
);
CREATE INDEX tp_resource_limits_idx ON tp_resource_limits (tpid);
-CREATE INDEX tp_resource_limits_unique ON tp_resource_limits ("tpid", "tag");
+CREATE INDEX tp_resource_limits_unique ON tp_resource_limits ("tpid", "tag", "filter_type", "filter_field_name");
DROP TABLE IF EXISTS versions;
CREATE TABLE versions (
diff --git a/engine/reqfilter.go b/engine/reqfilter.go
index 64b1ef358..fc5e9f0da 100644
--- a/engine/reqfilter.go
+++ b/engine/reqfilter.go
@@ -75,7 +75,7 @@ type RequestFilter struct {
func (rf *RequestFilter) CompileValues() (err error) {
if rf.Type == MetaRSRFields {
if rf.rsrFields, err = utils.ParseRSRFieldsFromSlice(rf.Values); err != nil {
- return err
+ return
}
} else if rf.Type == MetaCDRStats {
rf.cdrStatSThresholds = make([]*RFStatSThreshold, len(rf.Values))
@@ -98,7 +98,7 @@ func (rf *RequestFilter) CompileValues() (err error) {
rf.cdrStatSThresholds[i] = st
}
}
- return nil
+ return
}
// Pass is the method which should be used from outside.
diff --git a/engine/reslimiter.go b/engine/reslimiter.go
index a45c5c271..baeec2d29 100644
--- a/engine/reslimiter.go
+++ b/engine/reslimiter.go
@@ -20,11 +20,10 @@ package engine
import (
"fmt"
"reflect"
- "strings"
+ "sort"
"sync"
"time"
- "github.com/cgrates/cgrates/cache"
"github.com/cgrates/cgrates/config"
"github.com/cgrates/cgrates/utils"
"github.com/cgrates/rpcclient"
@@ -38,6 +37,7 @@ type ResourceUsage struct {
// ResourceLimit represents a limit imposed for accessing a resource (eg: new calls)
type ResourceLimit struct {
+ sync.Mutex
ID string // Identifier of this limit
Filters []*RequestFilter // Filters for the request
ActivationTime time.Time // Time when this limit becomes active
@@ -61,22 +61,28 @@ func (rl *ResourceLimit) removeExpiredUnits() {
}
func (rl *ResourceLimit) UsedUnits() float64 {
+ rl.Lock()
+ defer rl.Unlock()
if rl.UsageTTL != 0 {
rl.removeExpiredUnits()
}
return rl.usageCounter
}
-func (rl *ResourceLimit) RecordUsage(ru *ResourceUsage) error {
+func (rl *ResourceLimit) RecordUsage(ru *ResourceUsage) (err error) {
+ rl.Lock()
+ defer rl.Unlock()
if _, hasID := rl.Usage[ru.ID]; hasID {
return fmt.Errorf("Duplicate resource usage with id: %s", ru.ID)
}
rl.Usage[ru.ID] = ru
rl.usageCounter += ru.UsageUnits
- return nil
+ return
}
-func (rl *ResourceLimit) RemoveUsage(ruID string) error {
+func (rl *ResourceLimit) ClearUsage(ruID string) error {
+ rl.Lock()
+ defer rl.Unlock()
ru, hasIt := rl.Usage[ruID]
if !hasIt {
return fmt.Errorf("Cannot find usage record with id: %s", ruID)
@@ -86,23 +92,79 @@ func (rl *ResourceLimit) RemoveUsage(ruID string) error {
return nil
}
+// ResourceLimits is an ordered list of ResourceLimits based on Weight
+type ResourceLimits []*ResourceLimit
+
+// sort based on Weight
+func (rls ResourceLimits) Sort() {
+ sort.Slice(rls, func(i, j int) bool { return rls[i].Weight > rls[j].Weight })
+}
+
+// AllowUsage checks limits and decides whether the usage is allowed
+func (rls ResourceLimits) AllowUsage(usage float64) bool {
+ if len(rls) != 0 { // if rules defined, they need to allow usage
+ for _, rl := range rls {
+ if rl.Limit < rl.UsedUnits()+usage {
+ return false
+ }
+ }
+ }
+ return true
+}
+
+// RecordUsage will record the usage in all the resource limits
+func (rls ResourceLimits) RecordUsage(ru *ResourceUsage) (err error) {
+ var failedAtIdx int
+ for i, rl := range rls {
+ if err = rl.RecordUsage(ru); err != nil {
+ failedAtIdx = i
+ break
+ }
+ }
+ if err != nil {
+ for _, rl := range rls[:failedAtIdx] {
+ rl.ClearUsage(ru.ID) // best effort
+ }
+ }
+ return
+}
+
+// ClearUsage gives back the units to the pool
+func (rls ResourceLimits) ClearUsage(ruID string) {
+ for _, rl := range rls {
+ if err := rl.ClearUsage(ruID); err != nil {
+ utils.Logger.Warning(fmt.Sprintf(", err: %s", err.Error()))
+ }
+ }
+ return
+}
+
// Pas the config as a whole so we can ask access concurrently
func NewResourceLimiterService(cfg *config.CGRConfig, dataDB DataDB, cdrStatS rpcclient.RpcClientConnection) (*ResourceLimiterService, error) {
if cdrStatS != nil && reflect.ValueOf(cdrStatS).IsNil() {
cdrStatS = nil
}
- rls := &ResourceLimiterService{dataDB: dataDB, cdrStatS: cdrStatS}
- return rls, nil
+ return &ResourceLimiterService{dataDB: dataDB, cdrStatS: cdrStatS}, nil
}
// ResourcesLimiter is the service handling channel limits
type ResourceLimiterService struct {
- sync.RWMutex
dataDB DataDB // So we can load the data in cache and index it
cdrStatS rpcclient.RpcClientConnection
}
-func (rls *ResourceLimiterService) matchingResourceLimitsForEvent(ev map[string]interface{}) (map[string]*ResourceLimit, error) {
+// Called to start the service
+func (rls *ResourceLimiterService) ListenAndServe() error {
+ return nil
+}
+
+// Called to shutdown the service
+func (rls *ResourceLimiterService) ServiceShutdown() error {
+ return nil
+}
+
+// matchingResourceLimitsForEvent returns ordered list of matching resources which are active by the time of the call
+func (rls *ResourceLimiterService) matchingResourceLimitsForEvent(ev map[string]interface{}) (resLimits ResourceLimits, err error) {
matchingResources := make(map[string]*ResourceLimit)
for fldName, fieldValIf := range ev {
fldVal, canCast := utils.CastFieldIfToString(fieldValIf)
@@ -174,117 +236,58 @@ func (rls *ResourceLimiterService) matchingResourceLimitsForEvent(ev map[string]
matchingResources[rl.ID] = rl // Cannot save it here since we could have errors after and resource will remain unused
}
}
- return matchingResources, nil
+ resLimits = make(ResourceLimits, len(matchingResources))
+ i := 0
+ for _, rl := range matchingResources {
+ resLimits[i] = rl
+ i++
+ }
+ resLimits.Sort()
+ return resLimits, nil
}
-// Called to start the service
-func (rls *ResourceLimiterService) ListenAndServe() error {
- return nil
-}
-
-// Called to shutdown the service
-func (rls *ResourceLimiterService) ServiceShutdown() error {
- return nil
-}
-
-// RPC Methods
-
+// V1ResourceLimitsForEvent returns active resource limits matching the event
func (rls *ResourceLimiterService) V1ResourceLimitsForEvent(ev map[string]interface{}, reply *[]*ResourceLimit) error {
- rls.Lock() // Unknown number of RLs updated
- defer rls.Unlock()
matchingRLForEv, err := rls.matchingResourceLimitsForEvent(ev)
if err != nil {
return utils.NewErrServerError(err)
}
- retRLs := make([]*ResourceLimit, len(matchingRLForEv))
- i := 0
- for _, rl := range matchingRLForEv {
- retRLs[i] = rl
- i++
- }
- *reply = retRLs
+ *reply = matchingRLForEv
return nil
}
-// Alias API for external use
-func (rls *ResourceLimiterService) ResourceLimitsForEvent(ev map[string]interface{}, reply *[]*ResourceLimit) error {
- return rls.V1ResourceLimitsForEvent(ev, reply)
-}
-
-// Called when a session or another event needs to
-func (rls *ResourceLimiterService) V1InitiateResourceUsage(attrs utils.AttrRLsResourceUsage, reply *string) error {
- rls.Lock() // Unknown number of RLs updated
- defer rls.Unlock()
+func (rls *ResourceLimiterService) V1AllowUsage(attrs utils.AttrRLsResourceUsage, allow *bool) (err error) {
matchingRLForEv, err := rls.matchingResourceLimitsForEvent(attrs.Event)
if err != nil {
return utils.NewErrServerError(err)
}
- for rlID, rl := range matchingRLForEv {
- if rl.Limit < rl.UsedUnits()+attrs.RequestedUnits {
- delete(matchingRLForEv, rlID)
- }
- if err := rl.RecordUsage(&ResourceUsage{ID: attrs.ResourceUsageID, UsageTime: time.Now(), UsageUnits: attrs.RequestedUnits}); err != nil {
- return err // Should not happen
- }
+ *allow = matchingRLForEv.AllowUsage(attrs.Units)
+ return
+}
+
+// V1InitiateResourceUsage is called when a session or another event needs to consume
+func (rls *ResourceLimiterService) V1AllocateResource(args utils.AttrRLsResourceUsage, reply *string) (err error) {
+ mtcRLs, err := rls.matchingResourceLimitsForEvent(args.Event)
+ if err != nil {
+ return utils.NewErrServerError(err)
}
- if len(matchingRLForEv) == 0 {
+ if !mtcRLs.AllowUsage(args.Units) {
return utils.ErrResourceUnavailable
}
- for _, rl := range matchingRLForEv {
- cache.Set(utils.ResourceLimitsPrefix+rl.ID, rl, true, "") // no real reason for a transaction
+ if err = mtcRLs.RecordUsage(&ResourceUsage{ID: args.UsageID,
+ UsageTime: time.Now(), UsageUnits: args.Units}); err != nil {
+ return
}
*reply = utils.OK
- return nil
+ return
}
-// Alias for externam methods
-func (rls *ResourceLimiterService) InitiateResourceUsage(attrs utils.AttrRLsResourceUsage, reply *string) error {
- return rls.V1InitiateResourceUsage(attrs, reply)
-}
-
-func (rls *ResourceLimiterService) V1TerminateResourceUsage(attrs utils.AttrRLsResourceUsage, reply *string) error {
- rls.Lock() // Unknown number of RLs updated
- defer rls.Unlock()
- matchingRLForEv, err := rls.matchingResourceLimitsForEvent(attrs.Event)
+func (rls *ResourceLimiterService) V1ReleaseResource(attrs utils.AttrRLsResourceUsage, reply *string) (err error) {
+ mtcRLs, err := rls.matchingResourceLimitsForEvent(attrs.Event)
if err != nil {
return utils.NewErrServerError(err)
}
- for _, rl := range matchingRLForEv {
- rl.RemoveUsage(attrs.ResourceUsageID)
- }
+ mtcRLs.ClearUsage(attrs.UsageID)
*reply = utils.OK
return nil
}
-
-// Alias for external methods
-func (rls *ResourceLimiterService) TerminateResourceUsage(attrs utils.AttrRLsResourceUsage, reply *string) error {
- return rls.V1TerminateResourceUsage(attrs, reply)
-}
-
-// Make the service available as RPC internally
-func (rls *ResourceLimiterService) Call(serviceMethod string, args interface{}, reply interface{}) error {
- parts := strings.Split(serviceMethod, ".")
- if len(parts) != 2 {
- return utils.ErrNotImplemented
- }
- // get method
- method := reflect.ValueOf(rls).MethodByName(parts[0][len(parts[0])-2:] + parts[1]) // Inherit the version in the method
- if !method.IsValid() {
- return utils.ErrNotImplemented
- }
-
- // construct the params
- params := []reflect.Value{reflect.ValueOf(args), reflect.ValueOf(reply)}
- ret := method.Call(params)
- if len(ret) != 1 {
- return utils.ErrServerError
- }
- if ret[0].Interface() == nil {
- return nil
- }
- err, ok := ret[0].Interface().(error)
- if !ok {
- return utils.ErrServerError
- }
- return err
-}
diff --git a/engine/reslimiter_test.go b/engine/reslimiter_test.go
index 0c7d78073..4ca0e5e31 100644
--- a/engine/reslimiter_test.go
+++ b/engine/reslimiter_test.go
@@ -18,6 +18,7 @@ along with this program. If not, see
package engine
import (
+ "reflect"
"testing"
"time"
@@ -259,8 +260,8 @@ func TestRLsLoadRLs(t *testing.T) {
func TestRLsMatchingResourceLimitsForEvent(t *testing.T) {
rLS = &ResourceLimiterService{dataDB: dataStorage, cdrStatS: nil}
- eResLimits := map[string]*ResourceLimit{
- "RL1": &ResourceLimit{
+ eResLimits := ResourceLimits{
+ &ResourceLimit{
ID: "RL1",
Weight: 20,
Filters: []*RequestFilter{
@@ -272,7 +273,7 @@ func TestRLsMatchingResourceLimitsForEvent(t *testing.T) {
Limit: 2,
Usage: make(map[string]*ResourceUsage),
},
- "RL2": &ResourceLimit{
+ &ResourceLimit{
ID: "RL2",
Weight: 10,
Filters: []*RequestFilter{
@@ -287,23 +288,17 @@ func TestRLsMatchingResourceLimitsForEvent(t *testing.T) {
}
if resLimits, err := rLS.matchingResourceLimitsForEvent(map[string]interface{}{"Account": "1002", "Subject": "dan", "Destination": "1002"}); err != nil {
t.Error(err)
- } else if len(eResLimits) != len(resLimits) {
+ } else if !reflect.DeepEqual(eResLimits[0].Filters[0], resLimits[0].Filters[0]) {
t.Errorf("Expecting: %+v, received: %+v", eResLimits, resLimits)
} else {
- for rlID := range eResLimits {
- if _, hasID := resLimits[rlID]; !hasID {
- t.Errorf("Expecting: %+v, received: %+v", eResLimits, resLimits)
- }
- }
// Make sure the filters are what we expect to be after retrieving from cache:
- fltr := resLimits["RL1"].Filters[1]
+ fltr := resLimits[0].Filters[1]
if pass, _ := fltr.Pass(map[string]interface{}{"Subject": "10000001"}, "", nil); !pass {
- t.Errorf("Expecting RL: %+v, received: %+v", eResLimits["RL1"], resLimits["RL1"])
+ t.Errorf("Expecting RL: %+v, received: %+v", eResLimits[0], resLimits[0])
}
if pass, _ := fltr.Pass(map[string]interface{}{"Account": "1002"}, "", nil); pass {
- t.Errorf("Expecting RL: %+v, received: %+v", eResLimits["RL1"], resLimits["RL1"])
+ t.Errorf("Expecting RL: %+v, received: %+v", eResLimits[0], resLimits[0])
}
-
}
}
@@ -344,12 +339,12 @@ func TestRLsV1ResourceLimitsForEvent(t *testing.T) {
func TestRLsV1InitiateResourceUsage(t *testing.T) {
attrRU := utils.AttrRLsResourceUsage{
- ResourceUsageID: "651a8db2-4f67-4cf8-b622-169e8a482e50",
- Event: map[string]interface{}{"Account": "1002", "Subject": "dan", "Destination": "1002"},
- RequestedUnits: 1,
+ UsageID: "651a8db2-4f67-4cf8-b622-169e8a482e50",
+ Event: map[string]interface{}{"Account": "1002", "Subject": "dan", "Destination": "1002"},
+ Units: 1,
}
var reply string
- if err := rLS.V1InitiateResourceUsage(attrRU, &reply); err != nil {
+ if err := rLS.V1AllocateResource(attrRU, &reply); err != nil {
t.Error(err)
} else if reply != utils.OK {
t.Error("Received reply: ", reply)
@@ -359,21 +354,21 @@ func TestRLsV1InitiateResourceUsage(t *testing.T) {
t.Error(err)
} else if len(resLimits) != 2 {
t.Errorf("Received: %+v", resLimits)
- } else if resLimits["RL1"].UsedUnits() != 1 {
- t.Errorf("RL1: %+v", resLimits["RL1"])
- } else if _, hasKey := resLimits["RL1"].Usage[attrRU.ResourceUsageID]; !hasKey {
- t.Errorf("RL1: %+v", resLimits["RL1"])
+ } else if resLimits[0].UsedUnits() != 1 {
+ t.Errorf("RL1: %+v", resLimits[0])
+ } else if _, hasKey := resLimits[0].Usage[attrRU.UsageID]; !hasKey {
+ t.Errorf("RL1: %+v", resLimits[0])
}
}
func TestRLsV1TerminateResourceUsage(t *testing.T) {
attrRU := utils.AttrRLsResourceUsage{
- ResourceUsageID: "651a8db2-4f67-4cf8-b622-169e8a482e50",
- Event: map[string]interface{}{"Account": "1002", "Subject": "dan", "Destination": "1002"},
- RequestedUnits: 1,
+ UsageID: "651a8db2-4f67-4cf8-b622-169e8a482e50",
+ Event: map[string]interface{}{"Account": "1002", "Subject": "dan", "Destination": "1002"},
+ Units: 1,
}
var reply string
- if err := rLS.V1TerminateResourceUsage(attrRU, &reply); err != nil {
+ if err := rLS.V1ReleaseResource(attrRU, &reply); err != nil {
t.Error(err)
} else if reply != utils.OK {
t.Error("Received reply: ", reply)
@@ -383,9 +378,9 @@ func TestRLsV1TerminateResourceUsage(t *testing.T) {
t.Error(err)
} else if len(resLimits) != 2 {
t.Errorf("Received: %+v", resLimits)
- } else if resLimits["RL1"].UsedUnits() != 0 {
- t.Errorf("RL1: %+v", resLimits["RL1"])
- } else if _, hasKey := resLimits["RL1"].Usage[attrRU.ResourceUsageID]; hasKey {
- t.Errorf("RL1: %+v", resLimits["RL1"])
+ } else if resLimits[0].UsedUnits() != 0 {
+ t.Errorf("RL1: %+v", resLimits[0])
+ } else if _, hasKey := resLimits[0].Usage[attrRU.UsageID]; hasKey {
+ t.Errorf("RL1: %+v", resLimits[0])
}
}
diff --git a/engine/tpimporter_csv.go b/engine/tpimporter_csv.go
index b493508d0..702f822dc 100644
--- a/engine/tpimporter_csv.go
+++ b/engine/tpimporter_csv.go
@@ -56,6 +56,7 @@ var fileHandlers = map[string]func(*TPCSVImporter, string) error{
utils.LCRS_CSV: (*TPCSVImporter).importLcrs,
utils.USERS_CSV: (*TPCSVImporter).importUsers,
utils.ALIASES_CSV: (*TPCSVImporter).importAliases,
+ utils.ResourceLimitsCsv: (*TPCSVImporter).importResourceLimits,
}
func (self *TPCSVImporter) Run() error {
@@ -345,3 +346,14 @@ func (self *TPCSVImporter) importAliases(fn string) error {
}
return self.StorDb.SetTPAliases(tps)
}
+
+func (self *TPCSVImporter) importResourceLimits(fn string) error {
+ if self.Verbose {
+ log.Printf("Processing file: <%s> ", fn)
+ }
+ rls, err := self.csvr.GetTPResourceLimits(self.TPid, "")
+ if err != nil {
+ return err
+ }
+ return self.StorDb.SetTPResourceLimits(rls)
+}
diff --git a/sessionmanager/fssessionmanager.go b/sessionmanager/fssessionmanager.go
index 1219a0860..aa4948909 100644
--- a/sessionmanager/fssessionmanager.go
+++ b/sessionmanager/fssessionmanager.go
@@ -188,11 +188,11 @@ func (sm *FSSessionManager) onChannelPark(ev engine.Event, connId string) {
if sm.rls != nil {
var reply string
attrRU := utils.AttrRLsResourceUsage{
- ResourceUsageID: ev.GetUUID(),
- Event: ev.(FSEvent).AsMapStringInterface(sm.timezone),
- RequestedUnits: 1,
+ UsageID: ev.GetUUID(),
+ Event: ev.(FSEvent).AsMapStringInterface(sm.timezone),
+ Units: 1,
}
- if err := sm.rls.Call("RLsV1.InitiateResourceUsage", attrRU, &reply); err != nil {
+ if err := sm.rls.Call("RLsV1.AllocateResource", attrRU, &reply); err != nil {
if err.Error() == utils.ErrResourceUnavailable.Error() {
sm.unparkCall(ev.GetUUID(), connId, ev.GetCallDestNr(utils.META_DEFAULT), "-"+utils.ErrResourceUnavailable.Error())
} else {
@@ -251,12 +251,12 @@ func (sm *FSSessionManager) onChannelHangupComplete(ev engine.Event) {
}
var reply string
attrRU := utils.AttrRLsResourceUsage{
- ResourceUsageID: ev.GetUUID(),
- Event: ev.(FSEvent).AsMapStringInterface(sm.timezone),
- RequestedUnits: 1,
+ UsageID: ev.GetUUID(),
+ Event: ev.(FSEvent).AsMapStringInterface(sm.timezone),
+ Units: 1,
}
if sm.rls != nil {
- if err := sm.rls.Call("RLsV1.TerminateResourceUsage", attrRU, &reply); err != nil {
+ if err := sm.rls.Call("RLsV1.ReleaseResource", attrRU, &reply); err != nil {
utils.Logger.Err(fmt.Sprintf(" RLs API error: %s", err.Error()))
}
}
diff --git a/utils/apitpdata.go b/utils/apitpdata.go
index f99ad9cab..8253524c7 100644
--- a/utils/apitpdata.go
+++ b/utils/apitpdata.go
@@ -1283,9 +1283,9 @@ type AttrRLsCache struct {
}
type AttrRLsResourceUsage struct {
- ResourceUsageID string
- Event map[string]interface{}
- RequestedUnits float64
+ Event map[string]interface{}
+ UsageID string // ResourceUsage Identifier
+ Units float64
}
// Attributes to send on SessionDisconnect by SMG