Merge branch 'master' into master

This commit is contained in:
Dan Christian Bogos
2017-10-24 18:50:29 +02:00
committed by GitHub
7 changed files with 185 additions and 149 deletions

View File

@@ -57,13 +57,14 @@ type TpReader struct {
resProfiles map[string]map[string]*utils.TPResource
sqProfiles map[string]map[string]*utils.TPStats
thProfiles map[string]map[string]*utils.TPThreshold
filters map[string]map[string]*utils.TPFilter
filters map[utils.TenantID]*utils.TPFilter
resources []*utils.TenantID // IDs of resources which need creation based on resourceProfiles
statQueues []*utils.TenantID // IDs of statQueues which need creation based on statQueueProfiles
thresholds []*utils.TenantID // IDs of thresholds which need creation based on thresholdProfiles
revDests,
revAliases,
acntActionPlans map[string][]string
thdsIndexers map[string]*ReqFilterIndexer // tenant, indexer
}
func NewTpReader(db DataDB, lr LoadReader, tpid, timezone string) *TpReader {
@@ -135,10 +136,11 @@ func (tpr *TpReader) Init() {
tpr.resProfiles = make(map[string]map[string]*utils.TPResource)
tpr.sqProfiles = make(map[string]map[string]*utils.TPStats)
tpr.thProfiles = make(map[string]map[string]*utils.TPThreshold)
tpr.filters = make(map[string]map[string]*utils.TPFilter)
tpr.filters = make(map[utils.TenantID]*utils.TPFilter)
tpr.revDests = make(map[string][]string)
tpr.revAliases = make(map[string][]string)
tpr.acntActionPlans = make(map[string][]string)
tpr.thdsIndexers = make(map[string]*ReqFilterIndexer)
}
func (tpr *TpReader) LoadDestinationsFiltered(tag string) (bool, error) {
@@ -1658,8 +1660,9 @@ func (tpr *TpReader) LoadStats() error {
return tpr.LoadStatsFiltered("")
}
func (tpr *TpReader) LoadThresholdsFiltered(tag string) error {
tps, err := tpr.lr.GetTPThresholds(tpr.tpid, tag)
func (tpr *TpReader) LoadThresholdsFiltered(tag string) (err error) {
tps, err := tpr.lr.GetTPThreshold(tpr.tpid, tag)
if err != nil {
return err
}
@@ -1672,13 +1675,35 @@ func (tpr *TpReader) LoadThresholdsFiltered(tag string) error {
}
tpr.thProfiles = mapTHs
for tenant, mpID := range mapTHs {
for thID := range mpID {
thdIndxrKey := utils.ThresholdStringIndex + tenant
for thID, t := range mpID {
thTntID := &utils.TenantID{Tenant: tenant, ID: thID}
if has, err := tpr.dm.DataDB().HasData(utils.ThresholdPrefix, thTntID.TenantID()); err != nil {
return err
} else if !has {
tpr.thresholds = append(tpr.thresholds, thTntID)
}
// index thresholds for filters
if _, has := tpr.thdsIndexers[tenant]; !has {
if tpr.thdsIndexers[tenant], err = NewReqFilterIndexer(tpr.dm, thdIndxrKey); err != nil {
return
}
}
for _, fltrID := range t.FilterIDs {
tpFltr, has := tpr.filters[utils.TenantID{tenant, fltrID}]
if !has {
var fltr *Filter
if fltr, err = tpr.dm.GetFilter(tenant, fltrID, false, utils.NonTransactional); err != nil {
if err == utils.ErrNotFound {
err = fmt.Errorf("broken reference to filter: %s for threshold: %s", fltrID, thID)
}
return
} else {
tpFltr = FilterToTPFilter(fltr)
}
}
tpr.thdsIndexers[tenant].IndexTPFilter(tpFltr, thID)
}
}
}
return nil
@@ -1693,12 +1718,9 @@ func (tpr *TpReader) LoadFilterFiltered(tag string) error {
if err != nil {
return err
}
mapTHs := make(map[string]map[string]*utils.TPFilter)
mapTHs := make(map[utils.TenantID]*utils.TPFilter)
for _, th := range tps {
if _, has := mapTHs[th.Tenant]; !has {
mapTHs[th.Tenant] = make(map[string]*utils.TPFilter)
}
mapTHs[th.Tenant][th.ID] = th
mapTHs[utils.TenantID{th.Tenant, th.ID}] = th
}
tpr.filters = mapTHs
return nil
@@ -2001,6 +2023,21 @@ func (tpr *TpReader) WriteToDatabase(flush, verbose, disable_reverse bool) (err
log.Printf("\t %s : %+v", id, vals)
}
}
if verbose {
log.Print("Filters:")
}
for _, tpTH := range tpr.filters {
th, err := APItoFilter(tpTH, tpr.timezone)
if err != nil {
return err
}
if err = tpr.dm.SetFilter(th); err != nil {
return err
}
if verbose {
log.Print("\t", th.TenantID())
}
}
if verbose {
log.Print("ResourceProfiles:")
}
@@ -2094,23 +2131,6 @@ func (tpr *TpReader) WriteToDatabase(flush, verbose, disable_reverse bool) (err
log.Print("\t", thd.TenantID())
}
}
if verbose {
log.Print("Filters:")
}
for _, mpID := range tpr.filters {
for _, tpTH := range mpID {
th, err := APItoFilter(tpTH, tpr.timezone)
if err != nil {
return err
}
if err = tpr.dm.SetFilter(th); err != nil {
return err
}
if verbose {
log.Print("\t", th.TenantID())
}
}
}
if verbose {
log.Print("Timings:")
}
@@ -2195,54 +2215,15 @@ func (tpr *TpReader) WriteToDatabase(flush, verbose, disable_reverse bool) (err
}
}
}
/*
if len(tpr.thProfiles) > 0 {
if verbose {
log.Print("Indexing thresholds")
}
for tenant, mpID := range tpr.thProfiles {
stIdxr, err := NewReqFilterIndexer(tpr.dm, utils.ThresholdsIndex+tenant)
if err != nil {
return err
}
for _, tpTH := range mpID {
if th, err := APItoThresholdProfile(tpTH, tpr.timezone); err != nil {
return err
} else {
stIdxr.IndexFilters(th.ID, th.Filters)
}
}
if verbose {
log.Printf("Indexed thresholds tenant: %s, keys %+v", tenant, stIdxr.ChangedKeys().Slice())
}
if err := stIdxr.StoreIndexes(); err != nil {
return err
}
}
if verbose {
log.Print("Threshold filter indexes:")
}
for tenant, fltrIdxer := range tpr.thdsIndexers {
if err := fltrIdxer.StoreIndexes(); err != nil {
return err
}
*/
if len(tpr.filters) > 0 {
if verbose {
log.Print("Indexing Filters")
}
for tenant, mpID := range tpr.filters {
stIdxr, err := NewReqFilterIndexer(tpr.dm, utils.FilterIndex+tenant)
if err != nil {
return err
}
for _, tpTH := range mpID {
if th, err := APItoFilter(tpTH, tpr.timezone); err != nil {
return err
} else {
stIdxr.IndexFilters(th.ID, th.RequestFilters)
}
}
if verbose {
log.Printf("Indexed filters tenant: %s, keys %+v", tenant, stIdxr.ChangedKeys().Slice())
}
if err := stIdxr.StoreIndexes(); err != nil {
return err
}
log.Printf("Tenant: %s, keys %+v", tenant, fltrIdxer.ChangedKeys().Slice())
}
}
}
@@ -2467,7 +2448,7 @@ func (tpr *TpReader) GetLoadedIds(categ string) ([]string, error) {
keys := make([]string, len(tpr.filters))
i := 0
for k := range tpr.filters {
keys[i] = k
keys[i] = k.TenantID()
i++
}
return keys, nil