From 9d695d517ce5b4cc4de29b3a7a831c16ef94ea72 Mon Sep 17 00:00:00 2001 From: DanB Date: Tue, 24 Oct 2017 18:45:21 +0200 Subject: [PATCH] Index for threshold filters --- engine/filterindexer.go | 31 ++++++++++++++ engine/model_helpers.go | 25 +++++++++++ engine/tp_reader.go | 93 ++++++++++++++++++++++------------------- utils/apitpdata.go | 12 +++--- 4 files changed, 113 insertions(+), 48 deletions(-) diff --git a/engine/filterindexer.go b/engine/filterindexer.go index b456f8366..ada8b6b41 100644 --- a/engine/filterindexer.go +++ b/engine/filterindexer.go @@ -80,6 +80,37 @@ func (rfi *ReqFilterIndexer) IndexFilters(itemID string, reqFltrs []*RequestFilt return } +// IndexFilters parses reqFltrs, adding itemID in the indexes and marks the changed keys in chngdIndxKeys +func (rfi *ReqFilterIndexer) IndexTPFilter(tpFltr *utils.TPFilter, itemID string) { + var hasMetaString bool + for _, fltr := range tpFltr.Filters { + if fltr.Type != MetaString { + continue + } + hasMetaString = true // Mark that we found at least one metatring so we don't index globally + if _, hastIt := rfi.indexes[fltr.FieldName]; !hastIt { + rfi.indexes[fltr.FieldName] = make(map[string]utils.StringMap) + } + for _, fldVal := range fltr.Values { + if _, hasIt := rfi.indexes[fltr.FieldName][fldVal]; !hasIt { + rfi.indexes[fltr.FieldName][fldVal] = make(utils.StringMap) + } + rfi.indexes[fltr.FieldName][fldVal][itemID] = true + rfi.chngdIndxKeys[utils.ConcatenatedKey(fltr.FieldName, fldVal)] = true + } + } + if !hasMetaString { + if _, hasIt := rfi.indexes[utils.NOT_AVAILABLE]; !hasIt { + rfi.indexes[utils.NOT_AVAILABLE] = make(map[string]utils.StringMap) + } + if _, hasIt := rfi.indexes[utils.NOT_AVAILABLE][utils.NOT_AVAILABLE]; !hasIt { + rfi.indexes[utils.NOT_AVAILABLE][utils.NOT_AVAILABLE] = make(utils.StringMap) + } + rfi.indexes[utils.NOT_AVAILABLE][utils.NOT_AVAILABLE][itemID] = true // Fields without real field index will be located in map[NOT_AVAILABLE][NOT_AVAILABLE][rl.ID] + } + return +} + // StoreIndexes handles storing the indexes to dataDB func (rfi *ReqFilterIndexer) StoreIndexes() error { return rfi.dm.DataDB().SetReqFilterIndexes(rfi.dbKey, rfi.indexes) diff --git a/engine/model_helpers.go b/engine/model_helpers.go index 2e6856f0c..78c100d71 100755 --- a/engine/model_helpers.go +++ b/engine/model_helpers.go @@ -2419,3 +2419,28 @@ func APItoFilter(tpTH *utils.TPFilter, timezone string) (th *Filter, err error) } return th, nil } + +func FilterToTPFilter(f *Filter) (tpFltr *utils.TPFilter) { + tpFltr = &utils.TPFilter{ + Tenant: f.Tenant, + ID: f.ID, + Filters: make([]*utils.TPRequestFilter, len(f.RequestFilters)), + } + for i, reqFltr := range f.RequestFilters { + tpFltr.Filters[i] = &utils.TPRequestFilter{ + Type: reqFltr.Type, + FieldName: reqFltr.FieldName, + Values: make([]string, len(reqFltr.Values)), + } + for j, val := range reqFltr.Values { + tpFltr.Filters[i].Values[j] = val + } + } + if f.ActivationInterval != nil { + tpFltr.ActivationInterval = &utils.TPActivationInterval{ + ActivationTime: f.ActivationInterval.ActivationTime.Format(time.RFC3339), + ExpiryTime: f.ActivationInterval.ExpiryTime.Format(time.RFC3339), + } + } + return +} diff --git a/engine/tp_reader.go b/engine/tp_reader.go index 0c7388820..7562cfdee 100755 --- a/engine/tp_reader.go +++ b/engine/tp_reader.go @@ -64,6 +64,7 @@ type TpReader struct { revDests, revAliases, acntActionPlans map[string][]string + thdsIndexers map[string]*ReqFilterIndexer // tenant, indexer } func NewTpReader(db DataDB, lr LoadReader, tpid, timezone string) *TpReader { @@ -139,6 +140,7 @@ func (tpr *TpReader) Init() { tpr.revDests = make(map[string][]string) tpr.revAliases = make(map[string][]string) tpr.acntActionPlans = make(map[string][]string) + tpr.thdsIndexers = make(map[string]*ReqFilterIndexer) } func (tpr *TpReader) LoadDestinationsFiltered(tag string) (bool, error) { @@ -1658,7 +1660,7 @@ func (tpr *TpReader) LoadStats() error { return tpr.LoadStatsFiltered("") } -func (tpr *TpReader) LoadThresholdsFiltered(tag string) error { +func (tpr *TpReader) LoadThresholdsFiltered(tag string) (err error) { tps, err := tpr.lr.GetTPThreshold(tpr.tpid, tag) if err != nil { return err @@ -1672,13 +1674,35 @@ func (tpr *TpReader) LoadThresholdsFiltered(tag string) error { } tpr.thProfiles = mapTHs for tenant, mpID := range mapTHs { - for thID := range mpID { + thdIndxrKey := utils.ThresholdStringIndex + tenant + for thID, t := range mpID { thTntID := &utils.TenantID{Tenant: tenant, ID: thID} if has, err := tpr.dm.DataDB().HasData(utils.ThresholdPrefix, thTntID.TenantID()); err != nil { return err } else if !has { tpr.thresholds = append(tpr.thresholds, thTntID) } + // index thresholds for filters + if _, has := tpr.thdsIndexers[tenant]; !has { + if tpr.thdsIndexers[tenant], err = NewReqFilterIndexer(tpr.dm, thdIndxrKey); err != nil { + return + } + } + for _, fltrID := range t.FilterIDs { + tpFltr, has := tpr.filters[utils.TenantID{tenant, fltrID}] + if !has { + var fltr *Filter + if fltr, err = tpr.dm.GetFilter(tenant, fltrID, false, utils.NonTransactional); err != nil { + if err == utils.ErrNotFound { + err = fmt.Errorf("broken reference to filter: %s for threshold: %s", fltrID, thID) + } + return + } else { + tpFltr = FilterToTPFilter(fltr) + } + } + tpr.thdsIndexers[tenant].IndexTPFilter(tpFltr, thID) + } } } return nil @@ -1998,6 +2022,21 @@ func (tpr *TpReader) WriteToDatabase(flush, verbose, disable_reverse bool) (err log.Printf("\t %s : %+v", id, vals) } } + if verbose { + log.Print("Filters:") + } + for _, tpTH := range tpr.filters { + th, err := APItoFilter(tpTH, tpr.timezone) + if err != nil { + return err + } + if err = tpr.dm.SetFilter(th); err != nil { + return err + } + if verbose { + log.Print("\t", th.TenantID()) + } + } if verbose { log.Print("ResourceProfiles:") } @@ -2091,21 +2130,6 @@ func (tpr *TpReader) WriteToDatabase(flush, verbose, disable_reverse bool) (err log.Print("\t", thd.TenantID()) } } - if verbose { - log.Print("Filters:") - } - for _, tpTH := range tpr.filters { - th, err := APItoFilter(tpTH, tpr.timezone) - if err != nil { - return err - } - if err = tpr.dm.SetFilter(th); err != nil { - return err - } - if verbose { - log.Print("\t", th.TenantID()) - } - } if verbose { log.Print("Timings:") } @@ -2190,32 +2214,17 @@ func (tpr *TpReader) WriteToDatabase(flush, verbose, disable_reverse bool) (err } } } - /* - if len(tpr.thProfiles) > 0 { - if verbose { - log.Print("Indexing thresholds") - } - for tenant, mpID := range tpr.thProfiles { - stIdxr, err := NewReqFilterIndexer(tpr.dm, utils.ThresholdsIndex+tenant) - if err != nil { - return err - } - for _, tpTH := range mpID { - if th, err := APItoThresholdProfile(tpTH, tpr.timezone); err != nil { - return err - } else { - stIdxr.IndexFilters(th.ID, th.Filters) - } - } - if verbose { - log.Printf("Indexed thresholds tenant: %s, keys %+v", tenant, stIdxr.ChangedKeys().Slice()) - } - if err := stIdxr.StoreIndexes(); err != nil { - return err - } - } + if verbose { + log.Print("Threshold filter indexes:") + } + for tenant, fltrIdxer := range tpr.thdsIndexers { + if err := fltrIdxer.StoreIndexes(); err != nil { + return err } - */ + if verbose { + log.Printf("Tenant: %s, keys %+v", tenant, fltrIdxer.ChangedKeys().Slice()) + } + } } return } diff --git a/utils/apitpdata.go b/utils/apitpdata.go index f619f38fc..71b515922 100755 --- a/utils/apitpdata.go +++ b/utils/apitpdata.go @@ -1287,12 +1287,6 @@ type TPResource struct { Thresholds []string // Thresholds to check after changing Limit } -type TPRequestFilter struct { - Type string // Filter type (*string, *timing, *rsr_filters, *cdr_stats) - FieldName string // Name of the field providing us the Values to check (used in case of some ) - Values []string // Filter definition -} - // TPActivationInterval represents an activation interval for an item type TPActivationInterval struct { ActivationTime, @@ -1381,3 +1375,9 @@ type TPFilter struct { Filters []*TPRequestFilter ActivationInterval *TPActivationInterval // Time when this limit becomes active and expires } + +type TPRequestFilter struct { + Type string // Filter type (*string, *timing, *rsr_filters, *cdr_stats) + FieldName string // Name of the field providing us the Values to check (used in case of some ) + Values []string // Filter definition +}