Index for threshold filters

This commit is contained in:
DanB
2017-10-24 18:45:21 +02:00
parent 8dde242be7
commit 9d695d517c
4 changed files with 113 additions and 48 deletions

View File

@@ -80,6 +80,37 @@ func (rfi *ReqFilterIndexer) IndexFilters(itemID string, reqFltrs []*RequestFilt
return
}
// IndexFilters parses reqFltrs, adding itemID in the indexes and marks the changed keys in chngdIndxKeys
func (rfi *ReqFilterIndexer) IndexTPFilter(tpFltr *utils.TPFilter, itemID string) {
var hasMetaString bool
for _, fltr := range tpFltr.Filters {
if fltr.Type != MetaString {
continue
}
hasMetaString = true // Mark that we found at least one metatring so we don't index globally
if _, hastIt := rfi.indexes[fltr.FieldName]; !hastIt {
rfi.indexes[fltr.FieldName] = make(map[string]utils.StringMap)
}
for _, fldVal := range fltr.Values {
if _, hasIt := rfi.indexes[fltr.FieldName][fldVal]; !hasIt {
rfi.indexes[fltr.FieldName][fldVal] = make(utils.StringMap)
}
rfi.indexes[fltr.FieldName][fldVal][itemID] = true
rfi.chngdIndxKeys[utils.ConcatenatedKey(fltr.FieldName, fldVal)] = true
}
}
if !hasMetaString {
if _, hasIt := rfi.indexes[utils.NOT_AVAILABLE]; !hasIt {
rfi.indexes[utils.NOT_AVAILABLE] = make(map[string]utils.StringMap)
}
if _, hasIt := rfi.indexes[utils.NOT_AVAILABLE][utils.NOT_AVAILABLE]; !hasIt {
rfi.indexes[utils.NOT_AVAILABLE][utils.NOT_AVAILABLE] = make(utils.StringMap)
}
rfi.indexes[utils.NOT_AVAILABLE][utils.NOT_AVAILABLE][itemID] = true // Fields without real field index will be located in map[NOT_AVAILABLE][NOT_AVAILABLE][rl.ID]
}
return
}
// StoreIndexes handles storing the indexes to dataDB
func (rfi *ReqFilterIndexer) StoreIndexes() error {
return rfi.dm.DataDB().SetReqFilterIndexes(rfi.dbKey, rfi.indexes)

View File

@@ -2419,3 +2419,28 @@ func APItoFilter(tpTH *utils.TPFilter, timezone string) (th *Filter, err error)
}
return th, nil
}
func FilterToTPFilter(f *Filter) (tpFltr *utils.TPFilter) {
tpFltr = &utils.TPFilter{
Tenant: f.Tenant,
ID: f.ID,
Filters: make([]*utils.TPRequestFilter, len(f.RequestFilters)),
}
for i, reqFltr := range f.RequestFilters {
tpFltr.Filters[i] = &utils.TPRequestFilter{
Type: reqFltr.Type,
FieldName: reqFltr.FieldName,
Values: make([]string, len(reqFltr.Values)),
}
for j, val := range reqFltr.Values {
tpFltr.Filters[i].Values[j] = val
}
}
if f.ActivationInterval != nil {
tpFltr.ActivationInterval = &utils.TPActivationInterval{
ActivationTime: f.ActivationInterval.ActivationTime.Format(time.RFC3339),
ExpiryTime: f.ActivationInterval.ExpiryTime.Format(time.RFC3339),
}
}
return
}

View File

@@ -64,6 +64,7 @@ type TpReader struct {
revDests,
revAliases,
acntActionPlans map[string][]string
thdsIndexers map[string]*ReqFilterIndexer // tenant, indexer
}
func NewTpReader(db DataDB, lr LoadReader, tpid, timezone string) *TpReader {
@@ -139,6 +140,7 @@ func (tpr *TpReader) Init() {
tpr.revDests = make(map[string][]string)
tpr.revAliases = make(map[string][]string)
tpr.acntActionPlans = make(map[string][]string)
tpr.thdsIndexers = make(map[string]*ReqFilterIndexer)
}
func (tpr *TpReader) LoadDestinationsFiltered(tag string) (bool, error) {
@@ -1658,7 +1660,7 @@ func (tpr *TpReader) LoadStats() error {
return tpr.LoadStatsFiltered("")
}
func (tpr *TpReader) LoadThresholdsFiltered(tag string) error {
func (tpr *TpReader) LoadThresholdsFiltered(tag string) (err error) {
tps, err := tpr.lr.GetTPThreshold(tpr.tpid, tag)
if err != nil {
return err
@@ -1672,13 +1674,35 @@ func (tpr *TpReader) LoadThresholdsFiltered(tag string) error {
}
tpr.thProfiles = mapTHs
for tenant, mpID := range mapTHs {
for thID := range mpID {
thdIndxrKey := utils.ThresholdStringIndex + tenant
for thID, t := range mpID {
thTntID := &utils.TenantID{Tenant: tenant, ID: thID}
if has, err := tpr.dm.DataDB().HasData(utils.ThresholdPrefix, thTntID.TenantID()); err != nil {
return err
} else if !has {
tpr.thresholds = append(tpr.thresholds, thTntID)
}
// index thresholds for filters
if _, has := tpr.thdsIndexers[tenant]; !has {
if tpr.thdsIndexers[tenant], err = NewReqFilterIndexer(tpr.dm, thdIndxrKey); err != nil {
return
}
}
for _, fltrID := range t.FilterIDs {
tpFltr, has := tpr.filters[utils.TenantID{tenant, fltrID}]
if !has {
var fltr *Filter
if fltr, err = tpr.dm.GetFilter(tenant, fltrID, false, utils.NonTransactional); err != nil {
if err == utils.ErrNotFound {
err = fmt.Errorf("broken reference to filter: %s for threshold: %s", fltrID, thID)
}
return
} else {
tpFltr = FilterToTPFilter(fltr)
}
}
tpr.thdsIndexers[tenant].IndexTPFilter(tpFltr, thID)
}
}
}
return nil
@@ -1998,6 +2022,21 @@ func (tpr *TpReader) WriteToDatabase(flush, verbose, disable_reverse bool) (err
log.Printf("\t %s : %+v", id, vals)
}
}
if verbose {
log.Print("Filters:")
}
for _, tpTH := range tpr.filters {
th, err := APItoFilter(tpTH, tpr.timezone)
if err != nil {
return err
}
if err = tpr.dm.SetFilter(th); err != nil {
return err
}
if verbose {
log.Print("\t", th.TenantID())
}
}
if verbose {
log.Print("ResourceProfiles:")
}
@@ -2091,21 +2130,6 @@ func (tpr *TpReader) WriteToDatabase(flush, verbose, disable_reverse bool) (err
log.Print("\t", thd.TenantID())
}
}
if verbose {
log.Print("Filters:")
}
for _, tpTH := range tpr.filters {
th, err := APItoFilter(tpTH, tpr.timezone)
if err != nil {
return err
}
if err = tpr.dm.SetFilter(th); err != nil {
return err
}
if verbose {
log.Print("\t", th.TenantID())
}
}
if verbose {
log.Print("Timings:")
}
@@ -2190,32 +2214,17 @@ func (tpr *TpReader) WriteToDatabase(flush, verbose, disable_reverse bool) (err
}
}
}
/*
if len(tpr.thProfiles) > 0 {
if verbose {
log.Print("Indexing thresholds")
}
for tenant, mpID := range tpr.thProfiles {
stIdxr, err := NewReqFilterIndexer(tpr.dm, utils.ThresholdsIndex+tenant)
if err != nil {
return err
}
for _, tpTH := range mpID {
if th, err := APItoThresholdProfile(tpTH, tpr.timezone); err != nil {
return err
} else {
stIdxr.IndexFilters(th.ID, th.Filters)
}
}
if verbose {
log.Printf("Indexed thresholds tenant: %s, keys %+v", tenant, stIdxr.ChangedKeys().Slice())
}
if err := stIdxr.StoreIndexes(); err != nil {
return err
}
}
if verbose {
log.Print("Threshold filter indexes:")
}
for tenant, fltrIdxer := range tpr.thdsIndexers {
if err := fltrIdxer.StoreIndexes(); err != nil {
return err
}
*/
if verbose {
log.Printf("Tenant: %s, keys %+v", tenant, fltrIdxer.ChangedKeys().Slice())
}
}
}
return
}

View File

@@ -1287,12 +1287,6 @@ type TPResource struct {
Thresholds []string // Thresholds to check after changing Limit
}
type TPRequestFilter struct {
Type string // Filter type (*string, *timing, *rsr_filters, *cdr_stats)
FieldName string // Name of the field providing us the Values to check (used in case of some )
Values []string // Filter definition
}
// TPActivationInterval represents an activation interval for an item
type TPActivationInterval struct {
ActivationTime,
@@ -1381,3 +1375,9 @@ type TPFilter struct {
Filters []*TPRequestFilter
ActivationInterval *TPActivationInterval // Time when this limit becomes active and expires
}
type TPRequestFilter struct {
Type string // Filter type (*string, *timing, *rsr_filters, *cdr_stats)
FieldName string // Name of the field providing us the Values to check (used in case of some )
Values []string // Filter definition
}