From 6c8a5d58393ac8a9a49cbc56f8bdef58ca98d011 Mon Sep 17 00:00:00 2001 From: DanB Date: Tue, 24 Oct 2017 15:10:04 +0200 Subject: [PATCH 1/4] ThresholdsIndex -> ThresholdStringIndex --- engine/thresholds.go | 2 +- utils/consts.go | 2 +- 2 files changed, 2 insertions(+), 2 deletions(-) diff --git a/engine/thresholds.go b/engine/thresholds.go index 8b88c712b..817500519 100644 --- a/engine/thresholds.go +++ b/engine/thresholds.go @@ -250,7 +250,7 @@ func (tS *ThresholdService) StoreThreshold(t *Threshold) (err error) { // matchingThresholdsForEvent returns ordered list of matching thresholds which are active for an Event func (tS *ThresholdService) matchingThresholdsForEvent(ev *ThresholdEvent) (ts Thresholds, err error) { matchingTs := make(map[string]*Threshold) - tIDs, err := matchingItemIDsForEvent(ev.Event, tS.dm, utils.ThresholdsIndex+ev.Tenant) + tIDs, err := matchingItemIDsForEvent(ev.Event, tS.dm, utils.ThresholdStringIndex+ev.Tenant) if err != nil { return nil, err } diff --git a/utils/consts.go b/utils/consts.go index 214959d67..0e3520b2e 100755 --- a/utils/consts.go +++ b/utils/consts.go @@ -249,7 +249,7 @@ const ( ResourceProfilesPrefix = "rsp_" StatQueuesStringIndex = "ssi_" ThresholdPrefix = "thd_" - ThresholdsIndex = "thi_" + ThresholdStringIndex = "tsi_" TimingsPrefix = "tmg_" FilterPrefix = "ftr_" FilterIndex = "fti_" From 9dfdfa95d4de5819abf7abbaf700d76096f7a4bb Mon Sep 17 00:00:00 2001 From: DanB Date: Tue, 24 Oct 2017 15:24:06 +0200 Subject: [PATCH 2/4] ThresholdsIndex -> ThresholdStringIndex --- engine/thresholds.go | 2 +- 1 file changed, 1 insertion(+), 1 deletion(-) diff --git a/engine/thresholds.go b/engine/thresholds.go index 817500519..a06223618 100644 --- a/engine/thresholds.go +++ b/engine/thresholds.go @@ -254,7 +254,7 @@ func (tS *ThresholdService) matchingThresholdsForEvent(ev *ThresholdEvent) (ts T if err != nil { return nil, err } - lockIDs := utils.PrefixSliceItems(tIDs.Slice(), utils.ThresholdsIndex) + lockIDs := utils.PrefixSliceItems(tIDs.Slice(), utils.ThresholdStringIndex) guardian.Guardian.GuardIDs(config.CgrConfig().LockingTimeout, lockIDs...) defer guardian.Guardian.UnguardIDs(lockIDs...) for tID := range tIDs { From 8dde242be724d156a6ac162a9aa0d6dbd2b63a8c Mon Sep 17 00:00:00 2001 From: DanB Date: Tue, 24 Oct 2017 16:09:32 +0200 Subject: [PATCH 3/4] TPReader indexing loaded filters on TenantID --- engine/loader_csv_test.go | 131 +++++++++++++++++++------------------- engine/tp_reader.go | 59 +++++------------ 2 files changed, 80 insertions(+), 110 deletions(-) diff --git a/engine/loader_csv_test.go b/engine/loader_csv_test.go index b4182d807..a95c95a86 100755 --- a/engine/loader_csv_test.go +++ b/engine/loader_csv_test.go @@ -1510,84 +1510,83 @@ func TestLoadThresholdProfiles(t *testing.T) { } func TestLoadFilters(t *testing.T) { - eFilters := map[string]map[string]*utils.TPFilter{ - "cgrates.org": map[string]*utils.TPFilter{ - "FLTR_1": &utils.TPFilter{ - TPid: testTPID, - Tenant: "cgrates.org", - ID: "FLTR_1", - Filters: []*utils.TPRequestFilter{ - &utils.TPRequestFilter{ - FieldName: "Account", - Type: "*string", - Values: []string{"1001", "1002"}, - }, - &utils.TPRequestFilter{ - FieldName: "Destination", - Type: "*string_prefix", - Values: []string{"10", "20"}, - }, - &utils.TPRequestFilter{ - FieldName: "", - Type: "*rsr_fields", - Values: []string{"Subject(~^1.*1$)", "Destination(1002)"}, - }, + eFilters := map[utils.TenantID]*utils.TPFilter{ + utils.TenantID{"cgrates.org", "FLTR_1"}: &utils.TPFilter{ + TPid: testTPID, + Tenant: "cgrates.org", + ID: "FLTR_1", + Filters: []*utils.TPRequestFilter{ + &utils.TPRequestFilter{ + FieldName: "Account", + Type: "*string", + Values: []string{"1001", "1002"}, }, - ActivationInterval: &utils.TPActivationInterval{ - ActivationTime: "2014-07-29T15:00:00Z", + &utils.TPRequestFilter{ + FieldName: "Destination", + Type: "*string_prefix", + Values: []string{"10", "20"}, + }, + &utils.TPRequestFilter{ + FieldName: "", + Type: "*rsr_fields", + Values: []string{"Subject(~^1.*1$)", "Destination(1002)"}, }, }, - "FLTR_ACNT_dan": &utils.TPFilter{ - TPid: testTPID, - Tenant: "cgrates.org", - ID: "FLTR_ACNT_dan", - Filters: []*utils.TPRequestFilter{ - &utils.TPRequestFilter{ - FieldName: "Account", - Type: "*string", - Values: []string{"dan"}, - }, - }, - ActivationInterval: &utils.TPActivationInterval{ - ActivationTime: "2014-07-29T15:00:00Z", + ActivationInterval: &utils.TPActivationInterval{ + ActivationTime: "2014-07-29T15:00:00Z", + }, + }, + utils.TenantID{"cgrates.org", "FLTR_ACNT_dan"}: &utils.TPFilter{ + TPid: testTPID, + Tenant: "cgrates.org", + ID: "FLTR_ACNT_dan", + Filters: []*utils.TPRequestFilter{ + &utils.TPRequestFilter{ + FieldName: "Account", + Type: "*string", + Values: []string{"dan"}, }, }, - "FLTR_DST_DE": &utils.TPFilter{ - TPid: testTPID, - Tenant: "cgrates.org", - ID: "FLTR_DST_DE", - Filters: []*utils.TPRequestFilter{ - &utils.TPRequestFilter{ - FieldName: "Destination", - Type: "*destinations", - Values: []string{"DST_DE"}, - }, - }, - ActivationInterval: &utils.TPActivationInterval{ - ActivationTime: "2014-07-29T15:00:00Z", + ActivationInterval: &utils.TPActivationInterval{ + ActivationTime: "2014-07-29T15:00:00Z", + }, + }, + utils.TenantID{"cgrates.org", "FLTR_DST_DE"}: &utils.TPFilter{ + TPid: testTPID, + Tenant: "cgrates.org", + ID: "FLTR_DST_DE", + Filters: []*utils.TPRequestFilter{ + &utils.TPRequestFilter{ + FieldName: "Destination", + Type: "*destinations", + Values: []string{"DST_DE"}, }, }, - "FLTR_DST_NL": &utils.TPFilter{ - TPid: testTPID, - Tenant: "cgrates.org", - ID: "FLTR_DST_NL", - Filters: []*utils.TPRequestFilter{ - &utils.TPRequestFilter{ - FieldName: "Destination", - Type: "*destinations", - Values: []string{"DST_NL"}, - }, - }, - ActivationInterval: &utils.TPActivationInterval{ - ActivationTime: "2014-07-29T15:00:00Z", + ActivationInterval: &utils.TPActivationInterval{ + ActivationTime: "2014-07-29T15:00:00Z", + }, + }, + utils.TenantID{"cgrates.org", "FLTR_DST_NL"}: &utils.TPFilter{ + TPid: testTPID, + Tenant: "cgrates.org", + ID: "FLTR_DST_NL", + Filters: []*utils.TPRequestFilter{ + &utils.TPRequestFilter{ + FieldName: "Destination", + Type: "*destinations", + Values: []string{"DST_NL"}, }, }, + ActivationInterval: &utils.TPActivationInterval{ + ActivationTime: "2014-07-29T15:00:00Z", + }, }, } - if len(csvr.filters["cgrates.org"]) != len(eFilters["cgrates.org"]) { + fltrKey := utils.TenantID{"cgrates.org", "FLTR_1"} + if len(csvr.filters) != len(eFilters) { t.Errorf("Failed to load FilterProfiles: %s", utils.ToIJSON(csvr.filters)) - } else if !reflect.DeepEqual(eFilters["cgrates.org"]["FLTR_1"], csvr.filters["cgrates.org"]["FLTR_1"]) { - t.Errorf("Expecting: %+v, received: %+v", eFilters["cgrates.org"]["FLTR_1"], csvr.filters["cgrates.org"]["FLTR_1"]) + } else if !reflect.DeepEqual(eFilters[fltrKey], csvr.filters[fltrKey]) { + t.Errorf("Expecting: %+v, received: %+v", eFilters[fltrKey], csvr.filters[fltrKey]) } } diff --git a/engine/tp_reader.go b/engine/tp_reader.go index 9fd1f7700..0c7388820 100755 --- a/engine/tp_reader.go +++ b/engine/tp_reader.go @@ -57,7 +57,7 @@ type TpReader struct { resProfiles map[string]map[string]*utils.TPResource sqProfiles map[string]map[string]*utils.TPStats thProfiles map[string]map[string]*utils.TPThreshold - filters map[string]map[string]*utils.TPFilter + filters map[utils.TenantID]*utils.TPFilter resources []*utils.TenantID // IDs of resources which need creation based on resourceProfiles statQueues []*utils.TenantID // IDs of statQueues which need creation based on statQueueProfiles thresholds []*utils.TenantID // IDs of thresholds which need creation based on thresholdProfiles @@ -135,7 +135,7 @@ func (tpr *TpReader) Init() { tpr.resProfiles = make(map[string]map[string]*utils.TPResource) tpr.sqProfiles = make(map[string]map[string]*utils.TPStats) tpr.thProfiles = make(map[string]map[string]*utils.TPThreshold) - tpr.filters = make(map[string]map[string]*utils.TPFilter) + tpr.filters = make(map[utils.TenantID]*utils.TPFilter) tpr.revDests = make(map[string][]string) tpr.revAliases = make(map[string][]string) tpr.acntActionPlans = make(map[string][]string) @@ -1693,12 +1693,9 @@ func (tpr *TpReader) LoadFilterFiltered(tag string) error { if err != nil { return err } - mapTHs := make(map[string]map[string]*utils.TPFilter) + mapTHs := make(map[utils.TenantID]*utils.TPFilter) for _, th := range tps { - if _, has := mapTHs[th.Tenant]; !has { - mapTHs[th.Tenant] = make(map[string]*utils.TPFilter) - } - mapTHs[th.Tenant][th.ID] = th + mapTHs[utils.TenantID{th.Tenant, th.ID}] = th } tpr.filters = mapTHs return nil @@ -2097,18 +2094,16 @@ func (tpr *TpReader) WriteToDatabase(flush, verbose, disable_reverse bool) (err if verbose { log.Print("Filters:") } - for _, mpID := range tpr.filters { - for _, tpTH := range mpID { - th, err := APItoFilter(tpTH, tpr.timezone) - if err != nil { - return err - } - if err = tpr.dm.SetFilter(th); err != nil { - return err - } - if verbose { - log.Print("\t", th.TenantID()) - } + for _, tpTH := range tpr.filters { + th, err := APItoFilter(tpTH, tpr.timezone) + if err != nil { + return err + } + if err = tpr.dm.SetFilter(th); err != nil { + return err + } + if verbose { + log.Print("\t", th.TenantID()) } } if verbose { @@ -2221,30 +2216,6 @@ func (tpr *TpReader) WriteToDatabase(flush, verbose, disable_reverse bool) (err } } */ - if len(tpr.filters) > 0 { - if verbose { - log.Print("Indexing Filters") - } - for tenant, mpID := range tpr.filters { - stIdxr, err := NewReqFilterIndexer(tpr.dm, utils.FilterIndex+tenant) - if err != nil { - return err - } - for _, tpTH := range mpID { - if th, err := APItoFilter(tpTH, tpr.timezone); err != nil { - return err - } else { - stIdxr.IndexFilters(th.ID, th.RequestFilters) - } - } - if verbose { - log.Printf("Indexed filters tenant: %s, keys %+v", tenant, stIdxr.ChangedKeys().Slice()) - } - if err := stIdxr.StoreIndexes(); err != nil { - return err - } - } - } } return } @@ -2467,7 +2438,7 @@ func (tpr *TpReader) GetLoadedIds(categ string) ([]string, error) { keys := make([]string, len(tpr.filters)) i := 0 for k := range tpr.filters { - keys[i] = k + keys[i] = k.TenantID() i++ } return keys, nil From 9d695d517ce5b4cc4de29b3a7a831c16ef94ea72 Mon Sep 17 00:00:00 2001 From: DanB Date: Tue, 24 Oct 2017 18:45:21 +0200 Subject: [PATCH 4/4] Index for threshold filters --- engine/filterindexer.go | 31 ++++++++++++++ engine/model_helpers.go | 25 +++++++++++ engine/tp_reader.go | 93 ++++++++++++++++++++++------------------- utils/apitpdata.go | 12 +++--- 4 files changed, 113 insertions(+), 48 deletions(-) diff --git a/engine/filterindexer.go b/engine/filterindexer.go index b456f8366..ada8b6b41 100644 --- a/engine/filterindexer.go +++ b/engine/filterindexer.go @@ -80,6 +80,37 @@ func (rfi *ReqFilterIndexer) IndexFilters(itemID string, reqFltrs []*RequestFilt return } +// IndexFilters parses reqFltrs, adding itemID in the indexes and marks the changed keys in chngdIndxKeys +func (rfi *ReqFilterIndexer) IndexTPFilter(tpFltr *utils.TPFilter, itemID string) { + var hasMetaString bool + for _, fltr := range tpFltr.Filters { + if fltr.Type != MetaString { + continue + } + hasMetaString = true // Mark that we found at least one metatring so we don't index globally + if _, hastIt := rfi.indexes[fltr.FieldName]; !hastIt { + rfi.indexes[fltr.FieldName] = make(map[string]utils.StringMap) + } + for _, fldVal := range fltr.Values { + if _, hasIt := rfi.indexes[fltr.FieldName][fldVal]; !hasIt { + rfi.indexes[fltr.FieldName][fldVal] = make(utils.StringMap) + } + rfi.indexes[fltr.FieldName][fldVal][itemID] = true + rfi.chngdIndxKeys[utils.ConcatenatedKey(fltr.FieldName, fldVal)] = true + } + } + if !hasMetaString { + if _, hasIt := rfi.indexes[utils.NOT_AVAILABLE]; !hasIt { + rfi.indexes[utils.NOT_AVAILABLE] = make(map[string]utils.StringMap) + } + if _, hasIt := rfi.indexes[utils.NOT_AVAILABLE][utils.NOT_AVAILABLE]; !hasIt { + rfi.indexes[utils.NOT_AVAILABLE][utils.NOT_AVAILABLE] = make(utils.StringMap) + } + rfi.indexes[utils.NOT_AVAILABLE][utils.NOT_AVAILABLE][itemID] = true // Fields without real field index will be located in map[NOT_AVAILABLE][NOT_AVAILABLE][rl.ID] + } + return +} + // StoreIndexes handles storing the indexes to dataDB func (rfi *ReqFilterIndexer) StoreIndexes() error { return rfi.dm.DataDB().SetReqFilterIndexes(rfi.dbKey, rfi.indexes) diff --git a/engine/model_helpers.go b/engine/model_helpers.go index 2e6856f0c..78c100d71 100755 --- a/engine/model_helpers.go +++ b/engine/model_helpers.go @@ -2419,3 +2419,28 @@ func APItoFilter(tpTH *utils.TPFilter, timezone string) (th *Filter, err error) } return th, nil } + +func FilterToTPFilter(f *Filter) (tpFltr *utils.TPFilter) { + tpFltr = &utils.TPFilter{ + Tenant: f.Tenant, + ID: f.ID, + Filters: make([]*utils.TPRequestFilter, len(f.RequestFilters)), + } + for i, reqFltr := range f.RequestFilters { + tpFltr.Filters[i] = &utils.TPRequestFilter{ + Type: reqFltr.Type, + FieldName: reqFltr.FieldName, + Values: make([]string, len(reqFltr.Values)), + } + for j, val := range reqFltr.Values { + tpFltr.Filters[i].Values[j] = val + } + } + if f.ActivationInterval != nil { + tpFltr.ActivationInterval = &utils.TPActivationInterval{ + ActivationTime: f.ActivationInterval.ActivationTime.Format(time.RFC3339), + ExpiryTime: f.ActivationInterval.ExpiryTime.Format(time.RFC3339), + } + } + return +} diff --git a/engine/tp_reader.go b/engine/tp_reader.go index 0c7388820..7562cfdee 100755 --- a/engine/tp_reader.go +++ b/engine/tp_reader.go @@ -64,6 +64,7 @@ type TpReader struct { revDests, revAliases, acntActionPlans map[string][]string + thdsIndexers map[string]*ReqFilterIndexer // tenant, indexer } func NewTpReader(db DataDB, lr LoadReader, tpid, timezone string) *TpReader { @@ -139,6 +140,7 @@ func (tpr *TpReader) Init() { tpr.revDests = make(map[string][]string) tpr.revAliases = make(map[string][]string) tpr.acntActionPlans = make(map[string][]string) + tpr.thdsIndexers = make(map[string]*ReqFilterIndexer) } func (tpr *TpReader) LoadDestinationsFiltered(tag string) (bool, error) { @@ -1658,7 +1660,7 @@ func (tpr *TpReader) LoadStats() error { return tpr.LoadStatsFiltered("") } -func (tpr *TpReader) LoadThresholdsFiltered(tag string) error { +func (tpr *TpReader) LoadThresholdsFiltered(tag string) (err error) { tps, err := tpr.lr.GetTPThreshold(tpr.tpid, tag) if err != nil { return err @@ -1672,13 +1674,35 @@ func (tpr *TpReader) LoadThresholdsFiltered(tag string) error { } tpr.thProfiles = mapTHs for tenant, mpID := range mapTHs { - for thID := range mpID { + thdIndxrKey := utils.ThresholdStringIndex + tenant + for thID, t := range mpID { thTntID := &utils.TenantID{Tenant: tenant, ID: thID} if has, err := tpr.dm.DataDB().HasData(utils.ThresholdPrefix, thTntID.TenantID()); err != nil { return err } else if !has { tpr.thresholds = append(tpr.thresholds, thTntID) } + // index thresholds for filters + if _, has := tpr.thdsIndexers[tenant]; !has { + if tpr.thdsIndexers[tenant], err = NewReqFilterIndexer(tpr.dm, thdIndxrKey); err != nil { + return + } + } + for _, fltrID := range t.FilterIDs { + tpFltr, has := tpr.filters[utils.TenantID{tenant, fltrID}] + if !has { + var fltr *Filter + if fltr, err = tpr.dm.GetFilter(tenant, fltrID, false, utils.NonTransactional); err != nil { + if err == utils.ErrNotFound { + err = fmt.Errorf("broken reference to filter: %s for threshold: %s", fltrID, thID) + } + return + } else { + tpFltr = FilterToTPFilter(fltr) + } + } + tpr.thdsIndexers[tenant].IndexTPFilter(tpFltr, thID) + } } } return nil @@ -1998,6 +2022,21 @@ func (tpr *TpReader) WriteToDatabase(flush, verbose, disable_reverse bool) (err log.Printf("\t %s : %+v", id, vals) } } + if verbose { + log.Print("Filters:") + } + for _, tpTH := range tpr.filters { + th, err := APItoFilter(tpTH, tpr.timezone) + if err != nil { + return err + } + if err = tpr.dm.SetFilter(th); err != nil { + return err + } + if verbose { + log.Print("\t", th.TenantID()) + } + } if verbose { log.Print("ResourceProfiles:") } @@ -2091,21 +2130,6 @@ func (tpr *TpReader) WriteToDatabase(flush, verbose, disable_reverse bool) (err log.Print("\t", thd.TenantID()) } } - if verbose { - log.Print("Filters:") - } - for _, tpTH := range tpr.filters { - th, err := APItoFilter(tpTH, tpr.timezone) - if err != nil { - return err - } - if err = tpr.dm.SetFilter(th); err != nil { - return err - } - if verbose { - log.Print("\t", th.TenantID()) - } - } if verbose { log.Print("Timings:") } @@ -2190,32 +2214,17 @@ func (tpr *TpReader) WriteToDatabase(flush, verbose, disable_reverse bool) (err } } } - /* - if len(tpr.thProfiles) > 0 { - if verbose { - log.Print("Indexing thresholds") - } - for tenant, mpID := range tpr.thProfiles { - stIdxr, err := NewReqFilterIndexer(tpr.dm, utils.ThresholdsIndex+tenant) - if err != nil { - return err - } - for _, tpTH := range mpID { - if th, err := APItoThresholdProfile(tpTH, tpr.timezone); err != nil { - return err - } else { - stIdxr.IndexFilters(th.ID, th.Filters) - } - } - if verbose { - log.Printf("Indexed thresholds tenant: %s, keys %+v", tenant, stIdxr.ChangedKeys().Slice()) - } - if err := stIdxr.StoreIndexes(); err != nil { - return err - } - } + if verbose { + log.Print("Threshold filter indexes:") + } + for tenant, fltrIdxer := range tpr.thdsIndexers { + if err := fltrIdxer.StoreIndexes(); err != nil { + return err } - */ + if verbose { + log.Printf("Tenant: %s, keys %+v", tenant, fltrIdxer.ChangedKeys().Slice()) + } + } } return } diff --git a/utils/apitpdata.go b/utils/apitpdata.go index f619f38fc..71b515922 100755 --- a/utils/apitpdata.go +++ b/utils/apitpdata.go @@ -1287,12 +1287,6 @@ type TPResource struct { Thresholds []string // Thresholds to check after changing Limit } -type TPRequestFilter struct { - Type string // Filter type (*string, *timing, *rsr_filters, *cdr_stats) - FieldName string // Name of the field providing us the Values to check (used in case of some ) - Values []string // Filter definition -} - // TPActivationInterval represents an activation interval for an item type TPActivationInterval struct { ActivationTime, @@ -1381,3 +1375,9 @@ type TPFilter struct { Filters []*TPRequestFilter ActivationInterval *TPActivationInterval // Time when this limit becomes active and expires } + +type TPRequestFilter struct { + Type string // Filter type (*string, *timing, *rsr_filters, *cdr_stats) + FieldName string // Name of the field providing us the Values to check (used in case of some ) + Values []string // Filter definition +}