ResourceLimits loaded from .csv to redis, get, set remove methods in AccountingStorage interface

This commit is contained in:
DanB
2016-08-01 21:30:42 +02:00
parent d12c501ad6
commit a28665fb2b
16 changed files with 170 additions and 41 deletions

View File

@@ -1312,8 +1312,8 @@ func TestLoadAliases(t *testing.T) {
}
func TestLoadResourceLimits(t *testing.T) {
eResLimits := map[string]*utils.TPResourceLimits{
"ResGroup1": &utils.TPResourceLimits{
eResLimits := map[string]*utils.TPResourceLimit{
"ResGroup1": &utils.TPResourceLimit{
TPID: testTPID,
ID: "ResGroup1",
Filters: []*utils.TPRequestFilter{
@@ -1326,7 +1326,7 @@ func TestLoadResourceLimits(t *testing.T) {
Weight: 10,
Limit: "2",
},
"ResGroup2": &utils.TPResourceLimits{
"ResGroup2": &utils.TPResourceLimit{
TPID: testTPID,
ID: "ResGroup2",
Filters: []*utils.TPRequestFilter{

View File

@@ -409,3 +409,21 @@ func APItoModelUsers(attr *utils.TPUsers) (result []TpUser) {
}
return
}
func APItoResourceLimit(tpRL *utils.TPResourceLimit, timezone string) (rl *ResourceLimit, err error) {
rl = &ResourceLimit{ID: tpRL.ID, Weight: tpRL.Weight, Filters: make([]*RequestFilter, len(tpRL.Filters))}
for i, tpFltr := range tpRL.Filters {
rf := &RequestFilter{Type: tpFltr.Type, FieldName: tpFltr.FieldName, Values: tpFltr.Values}
if err := rf.CompileValues(); err != nil {
return nil, err
}
rl.Filters[i] = rf
}
if rl.ActivationTime, err = utils.ParseTimeDetectLayout(tpRL.ActivationTime, timezone); err != nil {
return nil, err
}
if rl.Limit, err = strconv.ParseFloat(tpRL.Limit, 64); err != nil {
return nil, err
}
return rl, nil
}

View File

@@ -814,12 +814,12 @@ func (tps TpLcrRules) GetLcrRules() (map[string]*utils.TPLcrRules, error) {
type TpResourceLimits []*TpResourceLimit
// Converts model received from StorDB or .csv into API format (optimized version for TP)
func (tps TpResourceLimits) AsTPResourceLimits() map[string]*utils.TPResourceLimits {
resLimits := make(map[string]*utils.TPResourceLimits)
func (tps TpResourceLimits) AsTPResourceLimits() map[string]*utils.TPResourceLimit {
resLimits := make(map[string]*utils.TPResourceLimit)
for _, tp := range tps {
resLimit, found := resLimits[tp.Tag]
if !found {
resLimit = &utils.TPResourceLimits{
resLimit = &utils.TPResourceLimit{
TPID: tp.Tpid,
ID: tp.Tag,
ActivationTime: tp.ActivationTime,

View File

@@ -667,8 +667,8 @@ func TestTpResourceLimitsAsTPResourceLimits(t *testing.T) {
Weight: 10.0,
Limit: "20"},
}
eTPs := map[string]*utils.TPResourceLimits{
tps[0].Tag: &utils.TPResourceLimits{
eTPs := map[string]*utils.TPResourceLimit{
tps[0].Tag: &utils.TPResourceLimit{
TPID: tps[0].Tpid,
ID: tps[0].Tag,
Filters: []*utils.TPRequestFilter{
@@ -688,7 +688,7 @@ func TestTpResourceLimitsAsTPResourceLimits(t *testing.T) {
Limit: tps[0].Limit,
ActionTriggerIDs: []string{"WARN_RES1", "WARN_RES2", "WARN3"},
},
tps[2].Tag: &utils.TPResourceLimits{
tps[2].Tag: &utils.TPResourceLimit{
TPID: tps[2].Tpid,
ID: tps[2].Tag,
Filters: []*utils.TPRequestFilter{

View File

@@ -49,32 +49,9 @@ func NewRequestFilter(rfType, fieldName string, vals []string) (*RequestFilter,
if len(vals) == 0 && utils.IsSliceMember([]string{MetaStringPrefix, MetaTimings, MetaRSRFields, MetaDestinations, MetaDestinations}, rfType) {
return nil, fmt.Errorf("Values is mandatory for Type: %s", rfType)
}
rf := &RequestFilter{Type: rfType, FieldName: fieldName, Values: vals, cdrStatSThresholds: make([]*RFStatSThreshold, len(vals))}
if rfType == MetaCDRStats {
for i, val := range vals {
valSplt := strings.Split(val, utils.InInFieldSep)
if len(valSplt) != 3 {
return nil, fmt.Errorf("Value %s needs to contain at least 3 items", val)
}
st := &RFStatSThreshold{QueueID: valSplt[0], ThresholdType: strings.ToUpper(valSplt[1])}
if len(st.ThresholdType) < len(MetaMinCapPrefix)+1 {
return nil, fmt.Errorf("Value %s contains a unsupported ThresholdType format", val)
} else if !strings.HasPrefix(st.ThresholdType, MetaMinCapPrefix) && !strings.HasPrefix(st.ThresholdType, MetaMaxCapPrefix) {
return nil, fmt.Errorf("Value %s contains unsupported ThresholdType prefix", val)
}
if tv, err := strconv.ParseFloat(valSplt[2], 64); err != nil {
return nil, err
} else {
st.ThresholdValue = tv
}
rf.cdrStatSThresholds[i] = st
}
}
if rfType == MetaRSRFields {
var err error
if rf.rsrFields, err = utils.ParseRSRFieldsFromSlice(vals); err != nil {
return nil, err
}
rf := &RequestFilter{Type: rfType, FieldName: fieldName, Values: vals}
if err := rf.CompileValues(); err != nil {
return nil, err
}
return rf, nil
}
@@ -95,6 +72,36 @@ type RequestFilter struct {
cdrStatSThresholds []*RFStatSThreshold // Cached compiled RFStatsThreshold out of Values
}
// Separate method to compile RSR fields
func (rf *RequestFilter) CompileValues() (err error) {
if rf.Type == MetaRSRFields {
if rf.rsrFields, err = utils.ParseRSRFieldsFromSlice(rf.Values); err != nil {
return err
}
} else if rf.Type == MetaCDRStats {
rf.cdrStatSThresholds = make([]*RFStatSThreshold, len(rf.Values))
for i, val := range rf.Values {
valSplt := strings.Split(val, utils.InInFieldSep)
if len(valSplt) != 3 {
return fmt.Errorf("Value %s needs to contain at least 3 items", val)
}
st := &RFStatSThreshold{QueueID: valSplt[0], ThresholdType: strings.ToUpper(valSplt[1])}
if len(st.ThresholdType) < len(MetaMinCapPrefix)+1 {
return fmt.Errorf("Value %s contains a unsupported ThresholdType format", val)
} else if !strings.HasPrefix(st.ThresholdType, MetaMinCapPrefix) && !strings.HasPrefix(st.ThresholdType, MetaMaxCapPrefix) {
return fmt.Errorf("Value %s contains unsupported ThresholdType prefix", val)
}
if tv, err := strconv.ParseFloat(valSplt[2], 64); err != nil {
return err
} else {
st.ThresholdValue = tv
}
rf.cdrStatSThresholds[i] = st
}
}
return nil
}
// Pass is the method which should be used from outside.
func (fltr *RequestFilter) Pass(req interface{}, extraFieldsLabel string, cdrStats rpcclient.RpcClientConnection) (bool, error) {
switch fltr.Type {

View File

@@ -35,6 +35,7 @@ type ResourceLimit struct {
Weight float64 // Weight to sort the ResourceLimits
Limit float64 // Limit value
ActionTriggers ActionTriggers // Thresholds to check after changing Limit
Used utils.Int64Slice // []time.Time.Unix() - keep it in this format so we can expire usage automatically
}
// ResourcesLimiter is the service handling channel limits

View File

@@ -92,6 +92,9 @@ type AccountingStorage interface {
SetAlias(*Alias) error
GetAlias(string, bool) (*Alias, error)
RemoveAlias(string) error
GetResourceLimit(string, bool) (*ResourceLimit, error)
SetResourceLimit(*ResourceLimit) error
RemoveResourceLimit(string) error
GetLoadHistory(int, bool) ([]*utils.LoadInstance, error)
AddLoadHistory(*utils.LoadInstance, int) error
GetStructVersion() (*StructVersion, error)

View File

@@ -986,3 +986,13 @@ func (ms *MapStorage) GetStructVersion() (rsv *StructVersion, err error) {
}
return
}
func (ms *MapStorage) GetResourceLimit(id string, skipCache bool) (*ResourceLimit, error) {
return nil, nil
}
func (ms *MapStorage) SetResourceLimit(rl *ResourceLimit) error {
return nil
}
func (ms *MapStorage) RemoveResourceLimit(id string) error {
return nil
}

View File

@@ -52,6 +52,7 @@ const (
colLht = "load_history"
colLogErr = "error_logs"
colVer = "versions"
colRL = "resource_limits"
)
var (
@@ -1587,3 +1588,13 @@ func (ms *MongoStorage) GetStructVersion() (rsv *StructVersion, err error) {
rsv = &result.Value
return
}
func (ms *MongoStorage) GetResourceLimit(id string, skipCache bool) (*ResourceLimit, error) {
return nil, nil
}
func (ms *MongoStorage) SetResourceLimit(rl *ResourceLimit) error {
return nil
}
func (ms *MongoStorage) RemoveResourceLimit(id string) error {
return nil
}

View File

@@ -1213,3 +1213,43 @@ func (rs *RedisStorage) GetStructVersion() (rsv *StructVersion, err error) {
}
return
}
func (rs *RedisStorage) GetResourceLimit(id string, skipCache bool) (rl *ResourceLimit, err error) {
key := utils.ResourceLimitsPrefix + id
if !skipCache {
if x, err := CacheGet(key); err == nil {
return x.(*ResourceLimit), nil
} else {
return nil, err
}
}
var values []byte
if values, err = rs.db.Cmd("GET", key).Bytes(); err == nil {
err = rs.ms.Unmarshal(values, &rl)
for _, fltr := range rl.Filters {
if err := fltr.CompileValues(); err != nil {
return nil, err
}
}
CacheSet(key, rl)
}
return
}
func (rs *RedisStorage) SetResourceLimit(rl *ResourceLimit) error {
result, err := rs.ms.Marshal(rl)
if err != nil {
return err
}
key := utils.ResourceLimitsPrefix + rl.ID
err = rs.db.Cmd("SET", key, result).Err
CacheSet(key, rl)
return err
}
func (rs *RedisStorage) RemoveResourceLimit(id string) error {
key := utils.ResourceLimitsPrefix + id
if err := rs.db.Cmd("DEL", key).Err; err != nil {
return err
}
CacheRemKey(key)
return nil
}

View File

@@ -169,7 +169,8 @@ func (self *SQLStorage) RemTpData(table, tpid string, args map[string]string) er
tx := self.db.Begin()
if len(table) == 0 { // Remove tpid out of all tables
for _, tblName := range []string{utils.TBL_TP_TIMINGS, utils.TBL_TP_DESTINATIONS, utils.TBL_TP_RATES, utils.TBL_TP_DESTINATION_RATES, utils.TBL_TP_RATING_PLANS, utils.TBL_TP_RATE_PROFILES,
utils.TBL_TP_SHARED_GROUPS, utils.TBL_TP_CDR_STATS, utils.TBL_TP_LCRS, utils.TBL_TP_ACTIONS, utils.TBL_TP_ACTION_PLANS, utils.TBL_TP_ACTION_TRIGGERS, utils.TBL_TP_ACCOUNT_ACTIONS, utils.TBL_TP_DERIVED_CHARGERS, utils.TBL_TP_ALIASES} {
utils.TBL_TP_SHARED_GROUPS, utils.TBL_TP_CDR_STATS, utils.TBL_TP_LCRS, utils.TBL_TP_ACTIONS, utils.TBL_TP_ACTION_PLANS, utils.TBL_TP_ACTION_TRIGGERS, utils.TBL_TP_ACCOUNT_ACTIONS,
utils.TBL_TP_DERIVED_CHARGERS, utils.TBL_TP_ALIASES, utils.TBLTPResourceLimits} {
if err := tx.Table(tblName).Where("tpid = ?", tpid).Delete(nil).Error; err != nil {
tx.Rollback()
return err

View File

@@ -35,7 +35,7 @@ type TpReader struct {
cdrStats map[string]*CdrStats
users map[string]*UserProfile
aliases map[string]*Alias
resLimits map[string]*utils.TPResourceLimits
resLimits map[string]*utils.TPResourceLimit
}
func NewTpReader(rs RatingStorage, as AccountingStorage, lr LoadReader, tpid, timezone string) *TpReader {
@@ -86,7 +86,7 @@ func (tpr *TpReader) Init() {
tpr.users = make(map[string]*UserProfile)
tpr.aliases = make(map[string]*Alias)
tpr.derivedChargers = make(map[string]*utils.DerivedChargers)
tpr.resLimits = make(map[string]*utils.TPResourceLimits)
tpr.resLimits = make(map[string]*utils.TPResourceLimit)
}
func (tpr *TpReader) LoadDestinationsFiltered(tag string) (bool, error) {
@@ -1823,6 +1823,21 @@ func (tpr *TpReader) WriteToDatabase(flush, verbose bool) (err error) {
log.Print("\t", al.GetId())
}
}
if verbose {
log.Print("ResourceLimits:")
}
for _, tpRL := range tpr.resLimits {
rl, err := APItoResourceLimit(tpRL, tpr.timezone)
if err != nil {
return err
}
if err = tpr.accountingStorage.SetResourceLimit(rl); err != nil {
return err
}
if verbose {
log.Print("\t", rl.ID)
}
}
return
}

View File

@@ -56,6 +56,7 @@ var (
LoadHistory: "1",
Cdrs: "1",
SMCosts: "1",
ResourceLimits: "1",
}
)
@@ -78,8 +79,9 @@ type StructVersion struct {
PubSubs string
LoadHistory string
// cdr
Cdrs string
SMCosts string
Cdrs string
SMCosts string
ResourceLimits string
}
type MigrationInfo struct {
@@ -210,5 +212,12 @@ func (sv *StructVersion) CompareAndMigrate(dbVer *StructVersion) []*MigrationInf
CurrentVersion: CurrentVersion.SMCosts,
})
}
if sv.ResourceLimits != dbVer.ResourceLimits {
migrationInfoList = append(migrationInfoList, &MigrationInfo{
Prefix: utils.ResourceLimitsPrefix,
DbVersion: dbVer.ResourceLimits,
CurrentVersion: CurrentVersion.ResourceLimits,
})
}
return migrationInfoList
}