ResourceLimiterService with Call method for RPC, RequestFilter constructor modification so we can pass CDRSTatS on check

This commit is contained in:
DanB
2016-08-01 13:45:21 +02:00
parent 0532732aee
commit 956c9f56ce
4 changed files with 68 additions and 24 deletions

View File

@@ -1,5 +1,6 @@
#Id,FilterType,FilterFieldName,FilterValues,ActivationTime,Weight,Limit,ActionTriggers
ResGroup1,*string_prefix,Destination,1001;1002,2014-07-29T15:00:00Z,10,2,
ResGroup1,*string,Account,1001;1002,2014-07-29T15:00:00Z,10,2,
ResGroup1,*string_prefix,Destination,10;20,2014-07-29T15:00:00Z,10,,
ResGroup1,*cdr_stats,,CDRST1:*min_ASR:34;CDRST_1001:*min_ASR:20,,,,
ResGroup1,*rsr_fields,,Subject(~^1.*1$);Destination(1002),,,,
ResGroup2,*destinations,Destination,DST_FS,2014-07-29T15:00:00Z,10,2,
1 #Id FilterType FilterFieldName FilterValues ActivationTime Weight Limit ActionTriggers
2 ResGroup1 *string_prefix *string Destination Account 1001;1002 2014-07-29T15:00:00Z 10 2
3 ResGroup1 *string_prefix Destination 10;20 2014-07-29T15:00:00Z 10
4 ResGroup1 *cdr_stats CDRST1:*min_ASR:34;CDRST_1001:*min_ASR:20
5 ResGroup1 *rsr_fields Subject(~^1.*1$);Destination(1002)
6 ResGroup2 *destinations Destination DST_FS 2014-07-29T15:00:00Z 10 2

View File

@@ -39,7 +39,7 @@ const (
MetaMaxCapPrefix = "*MAX_"
)
func NewRequestFilter(rfType, fieldName string, vals []string, cdrStats rpcclient.RpcClientConnection) (*RequestFilter, error) {
func NewRequestFilter(rfType, fieldName string, vals []string) (*RequestFilter, error) {
if !utils.IsSliceMember([]string{MetaStringPrefix, MetaTimings, MetaRSRFields, MetaCDRStats, MetaDestinations}, rfType) {
return nil, fmt.Errorf("Unsupported filter Type: %s", rfType)
}
@@ -49,11 +49,8 @@ func NewRequestFilter(rfType, fieldName string, vals []string, cdrStats rpcclien
if len(vals) == 0 && utils.IsSliceMember([]string{MetaStringPrefix, MetaTimings, MetaRSRFields, MetaDestinations, MetaDestinations}, rfType) {
return nil, fmt.Errorf("Values is mandatory for Type: %s", rfType)
}
rf := &RequestFilter{Type: rfType, FieldName: fieldName, Values: vals, cdrStats: cdrStats, cdrStatSThresholds: make([]*RFStatSThreshold, len(vals))}
rf := &RequestFilter{Type: rfType, FieldName: fieldName, Values: vals, cdrStatSThresholds: make([]*RFStatSThreshold, len(vals))}
if rfType == MetaCDRStats {
if cdrStats == nil {
return nil, errors.New("Missing cdrStats information")
}
for i, val := range vals {
valSplt := strings.Split(val, utils.InInFieldSep)
if len(valSplt) != 3 {
@@ -91,16 +88,15 @@ type RFStatSThreshold struct {
// RequestFilter filters requests coming into various places
// Pass rule: default negative, one mathing rule should pass the filter
type RequestFilter struct {
Type string // Filter type (*string, *timing, *rsr_filters, *cdr_stats)
FieldName string // Name of the field providing us the Values to check (used in case of some )
Values []string // Filter definition
rsrFields utils.RSRFields // Cache here the RSRFilter Values
cdrStats rpcclient.RpcClientConnection // Connection towards CDRStats service (eg: for *cdr_stats type)
cdrStatSThresholds []*RFStatSThreshold // Cached compiled RFStatsThreshold out of Values
Type string // Filter type (*string, *timing, *rsr_filters, *cdr_stats)
FieldName string // Name of the field providing us the Values to check (used in case of some )
Values []string // Filter definition
rsrFields utils.RSRFields // Cache here the RSRFilter Values
cdrStatSThresholds []*RFStatSThreshold // Cached compiled RFStatsThreshold out of Values
}
// Pass is the method which should be used from outside.
func (fltr *RequestFilter) Pass(req interface{}, extraFieldsLabel string) (bool, error) {
func (fltr *RequestFilter) Pass(req interface{}, extraFieldsLabel string, cdrStats rpcclient.RpcClientConnection) (bool, error) {
switch fltr.Type {
case MetaString:
return fltr.passString(req, extraFieldsLabel)
@@ -113,7 +109,7 @@ func (fltr *RequestFilter) Pass(req interface{}, extraFieldsLabel string) (bool,
case MetaRSRFields:
return fltr.passRSRFields(req, extraFieldsLabel)
case MetaCDRStats:
return fltr.passCDRStats(req, extraFieldsLabel)
return fltr.passCDRStats(req, extraFieldsLabel, cdrStats)
default:
return false, utils.ErrNotImplemented
}
@@ -181,10 +177,13 @@ func (fltr *RequestFilter) passRSRFields(req interface{}, extraFieldsLabel strin
return false, nil
}
func (fltr *RequestFilter) passCDRStats(req interface{}, extraFieldsLabel string) (bool, error) {
func (fltr *RequestFilter) passCDRStats(req interface{}, extraFieldsLabel string, cdrStats rpcclient.RpcClientConnection) (bool, error) {
if cdrStats == nil {
return false, errors.New("Missing CDRStatS information")
}
for _, threshold := range fltr.cdrStatSThresholds {
statValues := make(map[string]float64)
if err := fltr.cdrStats.Call("CDRStatsV1.GetValues", threshold.QueueID, &statValues); err != nil {
if err := cdrStats.Call("CDRStatsV1.GetValues", threshold.QueueID, &statValues); err != nil {
return false, err
}
if val, hasIt := statValues[threshold.ThresholdType[len(MetaMinCapPrefix):]]; !hasIt {

View File

@@ -87,7 +87,7 @@ func TestPassRSRFields(t *testing.T) {
cd := &CallDescriptor{Direction: "*out", Category: "call", Tenant: "cgrates.org", Subject: "dan", Destination: "+4986517174963",
TimeStart: time.Date(2013, time.October, 7, 14, 50, 0, 0, time.UTC), TimeEnd: time.Date(2013, time.October, 7, 14, 52, 12, 0, time.UTC),
DurationIndex: 132 * time.Second, ExtraFields: map[string]string{"navigation": "off"}}
rf, err := NewRequestFilter(MetaRSRFields, "", []string{"Tenant(~^cgr.*\\.org$)"}, nil)
rf, err := NewRequestFilter(MetaRSRFields, "", []string{"Tenant(~^cgr.*\\.org$)"})
if err != nil {
t.Error(err)
}
@@ -96,7 +96,7 @@ func TestPassRSRFields(t *testing.T) {
} else if !passes {
t.Error("Not passing")
}
rf, err = NewRequestFilter(MetaRSRFields, "", []string{"navigation(on)"}, nil)
rf, err = NewRequestFilter(MetaRSRFields, "", []string{"navigation(on)"})
if err != nil {
t.Error(err)
}
@@ -105,7 +105,7 @@ func TestPassRSRFields(t *testing.T) {
} else if passes {
t.Error("Passing")
}
rf, err = NewRequestFilter(MetaRSRFields, "", []string{"navigation(off)"}, nil)
rf, err = NewRequestFilter(MetaRSRFields, "", []string{"navigation(off)"})
if err != nil {
t.Error(err)
}
@@ -122,7 +122,7 @@ func TestPassDestinations(t *testing.T) {
cd := &CallDescriptor{Direction: "*out", Category: "call", Tenant: "cgrates.org", Subject: "dan", Destination: "+4986517174963",
TimeStart: time.Date(2013, time.October, 7, 14, 50, 0, 0, time.UTC), TimeEnd: time.Date(2013, time.October, 7, 14, 52, 12, 0, time.UTC),
DurationIndex: 132 * time.Second, ExtraFields: map[string]string{"navigation": "off"}}
rf, err := NewRequestFilter(MetaDestinations, "Destination", []string{"DE"}, nil)
rf, err := NewRequestFilter(MetaDestinations, "Destination", []string{"DE"})
if err != nil {
t.Error(err)
}
@@ -131,7 +131,7 @@ func TestPassDestinations(t *testing.T) {
} else if !passes {
t.Error("Not passing")
}
rf, err = NewRequestFilter(MetaDestinations, "Destination", []string{"RO"}, nil)
rf, err = NewRequestFilter(MetaDestinations, "Destination", []string{"RO"})
if err != nil {
t.Error(err)
}
@@ -161,11 +161,11 @@ func TestPassCDRStats(t *testing.T) {
if err != nil {
t.Error("Error appending cdr to stats: ", err)
}
rf, err := NewRequestFilter(MetaCDRStats, "", []string{"CDRST1:*min_asr:20", "CDRST2:*min_acd:10"}, cdrStats)
rf, err := NewRequestFilter(MetaCDRStats, "", []string{"CDRST1:*min_asr:20", "CDRST2:*min_acd:10"})
if err != nil {
t.Fatal(err)
}
if passes, err := rf.passCDRStats(cd, "ExtraFields"); err != nil {
if passes, err := rf.passCDRStats(cd, "ExtraFields", cdrStats); err != nil {
t.Error(err)
} else if !passes {
t.Error("Not passing")

View File

@@ -19,7 +19,12 @@ along with this program. If not, see <http://www.gnu.org/licenses/>
package engine
import (
"reflect"
"strings"
"sync"
"time"
"github.com/cgrates/cgrates/utils"
)
// ResourceLimit represents a limit imposed for accessing a resource (eg: new calls)
@@ -34,5 +39,44 @@ type ResourceLimit struct {
// ResourcesLimiter is the service handling channel limits
type ResourceLimiterService struct {
stringIndexes map[string]map[string]string // map[fieldName]map[fieldValue]resourceID
sync.RWMutex
stringIndexes map[string]map[string]utils.StringMap // map[fieldName]map[fieldValue]utils.StringMap[resourceID]
}
// Called to start the service
func (rls *ResourceLimiterService) Start() error {
return nil
}
// Called to shutdown the service
func (rls *ResourceLimiterService) Shutdown() error {
return nil
}
// Make the service available as RPC internally
func (rls *ResourceLimiterService) Call(serviceMethod string, args interface{}, reply interface{}) error {
parts := strings.Split(serviceMethod, ".")
if len(parts) != 2 {
return utils.ErrNotImplemented
}
// get method
method := reflect.ValueOf(rls).MethodByName(parts[0][len(parts[0])-2:] + parts[1]) // Inherit the version in the method
if !method.IsValid() {
return utils.ErrNotImplemented
}
// construct the params
params := []reflect.Value{reflect.ValueOf(args), reflect.ValueOf(reply)}
ret := method.Call(params)
if len(ret) != 1 {
return utils.ErrServerError
}
if ret[0].Interface() == nil {
return nil
}
err, ok := ret[0].Interface().(error)
if !ok {
return utils.ErrServerError
}
return err
}