From a28665fb2b78212dd49e05804ade87ebee5b6c02 Mon Sep 17 00:00:00 2001 From: DanB Date: Mon, 1 Aug 2016 21:30:42 +0200 Subject: [PATCH] ResourceLimits loaded from .csv to redis, get, set remove methods in AccountingStorage interface --- engine/loader_csv_test.go | 6 ++-- engine/model_converters.go | 18 +++++++++++ engine/model_helpers.go | 6 ++-- engine/model_helpers_test.go | 6 ++-- engine/reqfilter.go | 59 +++++++++++++++++++--------------- engine/reslimiter.go | 1 + engine/storage_interface.go | 3 ++ engine/storage_map.go | 10 ++++++ engine/storage_mongo_datadb.go | 11 +++++++ engine/storage_redis.go | 40 +++++++++++++++++++++++ engine/storage_sql.go | 3 +- engine/tp_reader.go | 19 +++++++++-- engine/version.go | 13 ++++++-- utils/apitpdata.go | 2 +- utils/consts.go | 1 + utils/coreutils.go | 13 ++++++++ 16 files changed, 170 insertions(+), 41 deletions(-) diff --git a/engine/loader_csv_test.go b/engine/loader_csv_test.go index d3b94cb1c..42b7195cd 100644 --- a/engine/loader_csv_test.go +++ b/engine/loader_csv_test.go @@ -1312,8 +1312,8 @@ func TestLoadAliases(t *testing.T) { } func TestLoadResourceLimits(t *testing.T) { - eResLimits := map[string]*utils.TPResourceLimits{ - "ResGroup1": &utils.TPResourceLimits{ + eResLimits := map[string]*utils.TPResourceLimit{ + "ResGroup1": &utils.TPResourceLimit{ TPID: testTPID, ID: "ResGroup1", Filters: []*utils.TPRequestFilter{ @@ -1326,7 +1326,7 @@ func TestLoadResourceLimits(t *testing.T) { Weight: 10, Limit: "2", }, - "ResGroup2": &utils.TPResourceLimits{ + "ResGroup2": &utils.TPResourceLimit{ TPID: testTPID, ID: "ResGroup2", Filters: []*utils.TPRequestFilter{ diff --git a/engine/model_converters.go b/engine/model_converters.go index f15e99abd..e869ba856 100644 --- a/engine/model_converters.go +++ b/engine/model_converters.go @@ -409,3 +409,21 @@ func APItoModelUsers(attr *utils.TPUsers) (result []TpUser) { } return } + +func APItoResourceLimit(tpRL *utils.TPResourceLimit, timezone string) (rl *ResourceLimit, err error) { + rl = &ResourceLimit{ID: tpRL.ID, Weight: tpRL.Weight, Filters: make([]*RequestFilter, len(tpRL.Filters))} + for i, tpFltr := range tpRL.Filters { + rf := &RequestFilter{Type: tpFltr.Type, FieldName: tpFltr.FieldName, Values: tpFltr.Values} + if err := rf.CompileValues(); err != nil { + return nil, err + } + rl.Filters[i] = rf + } + if rl.ActivationTime, err = utils.ParseTimeDetectLayout(tpRL.ActivationTime, timezone); err != nil { + return nil, err + } + if rl.Limit, err = strconv.ParseFloat(tpRL.Limit, 64); err != nil { + return nil, err + } + return rl, nil +} diff --git a/engine/model_helpers.go b/engine/model_helpers.go index b42aebcfd..1374a04f9 100644 --- a/engine/model_helpers.go +++ b/engine/model_helpers.go @@ -814,12 +814,12 @@ func (tps TpLcrRules) GetLcrRules() (map[string]*utils.TPLcrRules, error) { type TpResourceLimits []*TpResourceLimit // Converts model received from StorDB or .csv into API format (optimized version for TP) -func (tps TpResourceLimits) AsTPResourceLimits() map[string]*utils.TPResourceLimits { - resLimits := make(map[string]*utils.TPResourceLimits) +func (tps TpResourceLimits) AsTPResourceLimits() map[string]*utils.TPResourceLimit { + resLimits := make(map[string]*utils.TPResourceLimit) for _, tp := range tps { resLimit, found := resLimits[tp.Tag] if !found { - resLimit = &utils.TPResourceLimits{ + resLimit = &utils.TPResourceLimit{ TPID: tp.Tpid, ID: tp.Tag, ActivationTime: tp.ActivationTime, diff --git a/engine/model_helpers_test.go b/engine/model_helpers_test.go index 7e2192fb8..4e4e1522c 100644 --- a/engine/model_helpers_test.go +++ b/engine/model_helpers_test.go @@ -667,8 +667,8 @@ func TestTpResourceLimitsAsTPResourceLimits(t *testing.T) { Weight: 10.0, Limit: "20"}, } - eTPs := map[string]*utils.TPResourceLimits{ - tps[0].Tag: &utils.TPResourceLimits{ + eTPs := map[string]*utils.TPResourceLimit{ + tps[0].Tag: &utils.TPResourceLimit{ TPID: tps[0].Tpid, ID: tps[0].Tag, Filters: []*utils.TPRequestFilter{ @@ -688,7 +688,7 @@ func TestTpResourceLimitsAsTPResourceLimits(t *testing.T) { Limit: tps[0].Limit, ActionTriggerIDs: []string{"WARN_RES1", "WARN_RES2", "WARN3"}, }, - tps[2].Tag: &utils.TPResourceLimits{ + tps[2].Tag: &utils.TPResourceLimit{ TPID: tps[2].Tpid, ID: tps[2].Tag, Filters: []*utils.TPRequestFilter{ diff --git a/engine/reqfilter.go b/engine/reqfilter.go index bad8dd2c3..41181bd02 100644 --- a/engine/reqfilter.go +++ b/engine/reqfilter.go @@ -49,32 +49,9 @@ func NewRequestFilter(rfType, fieldName string, vals []string) (*RequestFilter, if len(vals) == 0 && utils.IsSliceMember([]string{MetaStringPrefix, MetaTimings, MetaRSRFields, MetaDestinations, MetaDestinations}, rfType) { return nil, fmt.Errorf("Values is mandatory for Type: %s", rfType) } - rf := &RequestFilter{Type: rfType, FieldName: fieldName, Values: vals, cdrStatSThresholds: make([]*RFStatSThreshold, len(vals))} - if rfType == MetaCDRStats { - for i, val := range vals { - valSplt := strings.Split(val, utils.InInFieldSep) - if len(valSplt) != 3 { - return nil, fmt.Errorf("Value %s needs to contain at least 3 items", val) - } - st := &RFStatSThreshold{QueueID: valSplt[0], ThresholdType: strings.ToUpper(valSplt[1])} - if len(st.ThresholdType) < len(MetaMinCapPrefix)+1 { - return nil, fmt.Errorf("Value %s contains a unsupported ThresholdType format", val) - } else if !strings.HasPrefix(st.ThresholdType, MetaMinCapPrefix) && !strings.HasPrefix(st.ThresholdType, MetaMaxCapPrefix) { - return nil, fmt.Errorf("Value %s contains unsupported ThresholdType prefix", val) - } - if tv, err := strconv.ParseFloat(valSplt[2], 64); err != nil { - return nil, err - } else { - st.ThresholdValue = tv - } - rf.cdrStatSThresholds[i] = st - } - } - if rfType == MetaRSRFields { - var err error - if rf.rsrFields, err = utils.ParseRSRFieldsFromSlice(vals); err != nil { - return nil, err - } + rf := &RequestFilter{Type: rfType, FieldName: fieldName, Values: vals} + if err := rf.CompileValues(); err != nil { + return nil, err } return rf, nil } @@ -95,6 +72,36 @@ type RequestFilter struct { cdrStatSThresholds []*RFStatSThreshold // Cached compiled RFStatsThreshold out of Values } +// Separate method to compile RSR fields +func (rf *RequestFilter) CompileValues() (err error) { + if rf.Type == MetaRSRFields { + if rf.rsrFields, err = utils.ParseRSRFieldsFromSlice(rf.Values); err != nil { + return err + } + } else if rf.Type == MetaCDRStats { + rf.cdrStatSThresholds = make([]*RFStatSThreshold, len(rf.Values)) + for i, val := range rf.Values { + valSplt := strings.Split(val, utils.InInFieldSep) + if len(valSplt) != 3 { + return fmt.Errorf("Value %s needs to contain at least 3 items", val) + } + st := &RFStatSThreshold{QueueID: valSplt[0], ThresholdType: strings.ToUpper(valSplt[1])} + if len(st.ThresholdType) < len(MetaMinCapPrefix)+1 { + return fmt.Errorf("Value %s contains a unsupported ThresholdType format", val) + } else if !strings.HasPrefix(st.ThresholdType, MetaMinCapPrefix) && !strings.HasPrefix(st.ThresholdType, MetaMaxCapPrefix) { + return fmt.Errorf("Value %s contains unsupported ThresholdType prefix", val) + } + if tv, err := strconv.ParseFloat(valSplt[2], 64); err != nil { + return err + } else { + st.ThresholdValue = tv + } + rf.cdrStatSThresholds[i] = st + } + } + return nil +} + // Pass is the method which should be used from outside. func (fltr *RequestFilter) Pass(req interface{}, extraFieldsLabel string, cdrStats rpcclient.RpcClientConnection) (bool, error) { switch fltr.Type { diff --git a/engine/reslimiter.go b/engine/reslimiter.go index be00519b9..40ffe1e20 100644 --- a/engine/reslimiter.go +++ b/engine/reslimiter.go @@ -35,6 +35,7 @@ type ResourceLimit struct { Weight float64 // Weight to sort the ResourceLimits Limit float64 // Limit value ActionTriggers ActionTriggers // Thresholds to check after changing Limit + Used utils.Int64Slice // []time.Time.Unix() - keep it in this format so we can expire usage automatically } // ResourcesLimiter is the service handling channel limits diff --git a/engine/storage_interface.go b/engine/storage_interface.go index 19eed4a94..63b5914c4 100644 --- a/engine/storage_interface.go +++ b/engine/storage_interface.go @@ -92,6 +92,9 @@ type AccountingStorage interface { SetAlias(*Alias) error GetAlias(string, bool) (*Alias, error) RemoveAlias(string) error + GetResourceLimit(string, bool) (*ResourceLimit, error) + SetResourceLimit(*ResourceLimit) error + RemoveResourceLimit(string) error GetLoadHistory(int, bool) ([]*utils.LoadInstance, error) AddLoadHistory(*utils.LoadInstance, int) error GetStructVersion() (*StructVersion, error) diff --git a/engine/storage_map.go b/engine/storage_map.go index 8dc7db777..eb6093a11 100644 --- a/engine/storage_map.go +++ b/engine/storage_map.go @@ -986,3 +986,13 @@ func (ms *MapStorage) GetStructVersion() (rsv *StructVersion, err error) { } return } + +func (ms *MapStorage) GetResourceLimit(id string, skipCache bool) (*ResourceLimit, error) { + return nil, nil +} +func (ms *MapStorage) SetResourceLimit(rl *ResourceLimit) error { + return nil +} +func (ms *MapStorage) RemoveResourceLimit(id string) error { + return nil +} diff --git a/engine/storage_mongo_datadb.go b/engine/storage_mongo_datadb.go index a57b99ba2..eae688dcb 100644 --- a/engine/storage_mongo_datadb.go +++ b/engine/storage_mongo_datadb.go @@ -52,6 +52,7 @@ const ( colLht = "load_history" colLogErr = "error_logs" colVer = "versions" + colRL = "resource_limits" ) var ( @@ -1587,3 +1588,13 @@ func (ms *MongoStorage) GetStructVersion() (rsv *StructVersion, err error) { rsv = &result.Value return } + +func (ms *MongoStorage) GetResourceLimit(id string, skipCache bool) (*ResourceLimit, error) { + return nil, nil +} +func (ms *MongoStorage) SetResourceLimit(rl *ResourceLimit) error { + return nil +} +func (ms *MongoStorage) RemoveResourceLimit(id string) error { + return nil +} diff --git a/engine/storage_redis.go b/engine/storage_redis.go index f67296c3e..796848bb3 100644 --- a/engine/storage_redis.go +++ b/engine/storage_redis.go @@ -1213,3 +1213,43 @@ func (rs *RedisStorage) GetStructVersion() (rsv *StructVersion, err error) { } return } + +func (rs *RedisStorage) GetResourceLimit(id string, skipCache bool) (rl *ResourceLimit, err error) { + key := utils.ResourceLimitsPrefix + id + if !skipCache { + if x, err := CacheGet(key); err == nil { + return x.(*ResourceLimit), nil + } else { + return nil, err + } + } + var values []byte + if values, err = rs.db.Cmd("GET", key).Bytes(); err == nil { + err = rs.ms.Unmarshal(values, &rl) + for _, fltr := range rl.Filters { + if err := fltr.CompileValues(); err != nil { + return nil, err + } + } + CacheSet(key, rl) + } + return +} +func (rs *RedisStorage) SetResourceLimit(rl *ResourceLimit) error { + result, err := rs.ms.Marshal(rl) + if err != nil { + return err + } + key := utils.ResourceLimitsPrefix + rl.ID + err = rs.db.Cmd("SET", key, result).Err + CacheSet(key, rl) + return err +} +func (rs *RedisStorage) RemoveResourceLimit(id string) error { + key := utils.ResourceLimitsPrefix + id + if err := rs.db.Cmd("DEL", key).Err; err != nil { + return err + } + CacheRemKey(key) + return nil +} diff --git a/engine/storage_sql.go b/engine/storage_sql.go index 1368cf42d..e8fce7e51 100644 --- a/engine/storage_sql.go +++ b/engine/storage_sql.go @@ -169,7 +169,8 @@ func (self *SQLStorage) RemTpData(table, tpid string, args map[string]string) er tx := self.db.Begin() if len(table) == 0 { // Remove tpid out of all tables for _, tblName := range []string{utils.TBL_TP_TIMINGS, utils.TBL_TP_DESTINATIONS, utils.TBL_TP_RATES, utils.TBL_TP_DESTINATION_RATES, utils.TBL_TP_RATING_PLANS, utils.TBL_TP_RATE_PROFILES, - utils.TBL_TP_SHARED_GROUPS, utils.TBL_TP_CDR_STATS, utils.TBL_TP_LCRS, utils.TBL_TP_ACTIONS, utils.TBL_TP_ACTION_PLANS, utils.TBL_TP_ACTION_TRIGGERS, utils.TBL_TP_ACCOUNT_ACTIONS, utils.TBL_TP_DERIVED_CHARGERS, utils.TBL_TP_ALIASES} { + utils.TBL_TP_SHARED_GROUPS, utils.TBL_TP_CDR_STATS, utils.TBL_TP_LCRS, utils.TBL_TP_ACTIONS, utils.TBL_TP_ACTION_PLANS, utils.TBL_TP_ACTION_TRIGGERS, utils.TBL_TP_ACCOUNT_ACTIONS, + utils.TBL_TP_DERIVED_CHARGERS, utils.TBL_TP_ALIASES, utils.TBLTPResourceLimits} { if err := tx.Table(tblName).Where("tpid = ?", tpid).Delete(nil).Error; err != nil { tx.Rollback() return err diff --git a/engine/tp_reader.go b/engine/tp_reader.go index bcb77a688..26381ed6f 100644 --- a/engine/tp_reader.go +++ b/engine/tp_reader.go @@ -35,7 +35,7 @@ type TpReader struct { cdrStats map[string]*CdrStats users map[string]*UserProfile aliases map[string]*Alias - resLimits map[string]*utils.TPResourceLimits + resLimits map[string]*utils.TPResourceLimit } func NewTpReader(rs RatingStorage, as AccountingStorage, lr LoadReader, tpid, timezone string) *TpReader { @@ -86,7 +86,7 @@ func (tpr *TpReader) Init() { tpr.users = make(map[string]*UserProfile) tpr.aliases = make(map[string]*Alias) tpr.derivedChargers = make(map[string]*utils.DerivedChargers) - tpr.resLimits = make(map[string]*utils.TPResourceLimits) + tpr.resLimits = make(map[string]*utils.TPResourceLimit) } func (tpr *TpReader) LoadDestinationsFiltered(tag string) (bool, error) { @@ -1823,6 +1823,21 @@ func (tpr *TpReader) WriteToDatabase(flush, verbose bool) (err error) { log.Print("\t", al.GetId()) } } + if verbose { + log.Print("ResourceLimits:") + } + for _, tpRL := range tpr.resLimits { + rl, err := APItoResourceLimit(tpRL, tpr.timezone) + if err != nil { + return err + } + if err = tpr.accountingStorage.SetResourceLimit(rl); err != nil { + return err + } + if verbose { + log.Print("\t", rl.ID) + } + } return } diff --git a/engine/version.go b/engine/version.go index 6d69f6f6b..6d851f86a 100644 --- a/engine/version.go +++ b/engine/version.go @@ -56,6 +56,7 @@ var ( LoadHistory: "1", Cdrs: "1", SMCosts: "1", + ResourceLimits: "1", } ) @@ -78,8 +79,9 @@ type StructVersion struct { PubSubs string LoadHistory string // cdr - Cdrs string - SMCosts string + Cdrs string + SMCosts string + ResourceLimits string } type MigrationInfo struct { @@ -210,5 +212,12 @@ func (sv *StructVersion) CompareAndMigrate(dbVer *StructVersion) []*MigrationInf CurrentVersion: CurrentVersion.SMCosts, }) } + if sv.ResourceLimits != dbVer.ResourceLimits { + migrationInfoList = append(migrationInfoList, &MigrationInfo{ + Prefix: utils.ResourceLimitsPrefix, + DbVersion: dbVer.ResourceLimits, + CurrentVersion: CurrentVersion.ResourceLimits, + }) + } return migrationInfoList } diff --git a/utils/apitpdata.go b/utils/apitpdata.go index 4d1738767..cd7c0bc7a 100644 --- a/utils/apitpdata.go +++ b/utils/apitpdata.go @@ -1200,7 +1200,7 @@ type AttrSetBalance struct { Disabled *bool } -type TPResourceLimits struct { +type TPResourceLimit struct { TPID string ID string // Identifier of this limit Filters []*TPRequestFilter // Filters for the request diff --git a/utils/consts.go b/utils/consts.go index e8fc28e52..97fe1b042 100644 --- a/utils/consts.go +++ b/utils/consts.go @@ -200,6 +200,7 @@ const ( PUBSUB_SUBSCRIBERS_PREFIX = "pss_" USERS_PREFIX = "usr_" ALIASES_PREFIX = "als_" + ResourceLimitsPrefix = "rl_" REVERSE_ALIASES_PREFIX = "rls_" CDR_STATS_PREFIX = "cst_" TEMP_DESTINATION_PREFIX = "tmp_" diff --git a/utils/coreutils.go b/utils/coreutils.go index b2a52e096..904c9d220 100644 --- a/utils/coreutils.go +++ b/utils/coreutils.go @@ -603,3 +603,16 @@ func MaskSuffix(dest string, maskLen int) string { } return dest } + +// Sortable Int64Slice +type Int64Slice []int64 + +func (slc Int64Slice) Len() int { + return len(slc) +} +func (slc Int64Slice) Swap(i, j int) { + slc[i], slc[j] = slc[j], slc[i] +} +func (slc Int64Slice) Less(i, j int) bool { + return slc[i] < slc[j] +}