Fixed pull conflicts and added filterindexecache_it_test.go

This commit is contained in:
edwardro22
2018-01-12 13:44:10 +02:00
committed by Dan Christian Bogos
parent 26b80f2440
commit 1b7e0c8e8c
11 changed files with 889 additions and 329 deletions

File diff suppressed because it is too large Load Diff

View File

@@ -81,6 +81,9 @@ func (apierV1 *ApierV1) SetResourceProfile(res *engine.ResourceProfile, reply *s
if err := apierV1.DataManager.SetResourceProfile(res, true); err != nil {
return utils.APIErrorHandler(err)
}
if err := apierV1.DataManager.SetResource(&engine.Resource{Tenant: res.Tenant, ID: res.ID, Usages: make(map[string]*engine.ResourceUsage)}); err != nil {
return utils.APIErrorHandler(err)
}
*reply = utils.OK
return nil
}

View File

@@ -45,6 +45,17 @@ func (apierV1 *ApierV1) SetStatQueueProfile(sqp *engine.StatQueueProfile, reply
if err := apierV1.DataManager.SetStatQueueProfile(sqp, true); err != nil {
return utils.APIErrorHandler(err)
}
metrics := make(map[string]engine.StatMetric)
for _, metricwithparam := range sqp.Metrics {
if metric, err := engine.NewStatMetric(metricwithparam.MetricID, sqp.MinItems, metricwithparam.Parameters); err != nil {
return utils.APIErrorHandler(err)
} else {
metrics[metricwithparam.MetricID] = metric
}
}
if err := apierV1.DataManager.SetStatQueue(&engine.StatQueue{Tenant: sqp.Tenant, ID: sqp.ID, SQMetrics: metrics}); err != nil {
return utils.APIErrorHandler(err)
}
*reply = utils.OK
return nil
}

View File

@@ -307,11 +307,11 @@ func testV1STSSetStatQueueProfile(t *testing.T) {
TTL: time.Duration(10) * time.Second,
Metrics: []*utils.MetricWithParams{
&utils.MetricWithParams{
MetricID: "MetricValue",
MetricID: utils.MetaSum,
Parameters: "",
},
&utils.MetricWithParams{
MetricID: "MetricValueTwo",
MetricID: utils.MetaAverage,
Parameters: "",
},
},

View File

@@ -79,6 +79,9 @@ func (apierV1 *ApierV1) SetThresholdProfile(thp *engine.ThresholdProfile, reply
if err := apierV1.DataManager.SetThresholdProfile(thp, true); err != nil {
return utils.APIErrorHandler(err)
}
if err := apierV1.DataManager.SetThreshold(&engine.Threshold{Tenant: thp.Tenant, ID: thp.ID}); err != nil {
return err
}
*reply = utils.OK
return nil
}

View File

@@ -59,6 +59,7 @@ func (alS *AttributeService) matchingAttributeProfilesForEvent(ev *utils.CGREven
if ev.Context != nil && *ev.Context != "" {
contextVal = *ev.Context
}
attrIdxKey = utils.ConcatenatedKey(ev.Tenant, contextVal)
matchingAPs := make(map[string]*AttributeProfile)
aPrflIDs, err := matchingItemIDsForEvent(ev.Event, alS.indexedFields,

View File

@@ -431,6 +431,8 @@ func (dm *DataManager) SetThresholdProfile(th *ThresholdProfile, withIndex bool)
return
}
}
cache.RemPrefixKey(utils.ThresholdStringIndex, true, utils.NonTransactional)
cache.RemPrefixKey(utils.ThresholdStringRevIndex, true, utils.NonTransactional)
return
}
@@ -486,7 +488,7 @@ func (dm *DataManager) SetStatQueueProfile(sqp *StatQueueProfile, withIndex bool
var fltr *Filter
if fltr, err = dm.GetFilter(sqp.Tenant, fltrID, false, utils.NonTransactional); err != nil {
if err == utils.ErrNotFound {
err = fmt.Errorf("broken reference to filter: %+v for threshold: %+v", fltrID, sqp)
err = fmt.Errorf("broken reference to filter: %+v for statqueue: %+v", fltrID, sqp)
}
return
}
@@ -500,13 +502,14 @@ func (dm *DataManager) SetStatQueueProfile(sqp *StatQueueProfile, withIndex bool
}
}
}
indexer.IndexTPFilter(FilterToTPFilter(fltr), sqp.ID)
}
if err = indexer.StoreIndexes(); err != nil {
return
}
}
cache.RemPrefixKey(utils.StatQueuesStringIndex, true, utils.NonTransactional)
cache.RemPrefixKey(utils.StatQueuesStringRevIndex, true, utils.NonTransactional)
return
}
@@ -1182,6 +1185,9 @@ func (dm *DataManager) SetAttributeProfile(ap *AttributeProfile, withIndex bool)
}
}
}
cache.RemPrefixKey(utils.AttributeProfilesStringIndex, true, utils.NonTransactional)
cache.RemPrefixKey(utils.AttributeProfilesStringRevIndex, true, utils.NonTransactional)
return
}

View File

@@ -154,7 +154,6 @@ func (rfi *ReqFilterIndexer) RemoveItemFromIndex(itemID string) (err error) {
}
}
}
rfi.reveseIndex[itemID] = make(utils.StringMap) //Force deleting in driver
if err = rfi.dm.SetFilterIndexes(
GetDBIndexKey(rfi.itemType, rfi.dbKeySuffix, false),

View File

@@ -83,6 +83,10 @@ func (fS *FilterS) PassFiltersForEvent(tenant string, ev map[string]interface{},
!f.ActivationInterval.IsActiveAtTime(time.Now()) { // not active
continue
}
for _, fltr22 := range f.RequestFilters {
strVal, _ := utils.ReflectFieldAsString(ev, fltr22.FieldName, "")
}
for _, fltr := range f.RequestFilters {
switch fltr.Type {
case MetaString:

View File

@@ -169,6 +169,7 @@ func (sS *StatService) matchingStatQueuesForEvent(ev *utils.CGREvent) (sqs StatQ
if err != nil {
return nil, err
}
if sqPrfl.Stored && s.dirty == nil {
s.dirty = utils.BoolPointer(false)
}

View File

@@ -216,12 +216,16 @@ func (tS *ThresholdService) StoreThreshold(t *Threshold) (err error) {
func (tS *ThresholdService) matchingThresholdsForEvent(ev *utils.CGREvent) (ts Thresholds, err error) {
matchingTs := make(map[string]*Threshold)
tIDs, err := matchingItemIDsForEvent(ev.Event, tS.indexedFields, tS.dm, utils.ThresholdStringIndex+ev.Tenant)
for _, itm := range tIDs {
}
if err != nil {
return nil, err
}
lockIDs := utils.PrefixSliceItems(tIDs.Slice(), utils.ThresholdStringIndex)
guardian.Guardian.GuardIDs(config.CgrConfig().LockingTimeout, lockIDs...)
defer guardian.Guardian.UnguardIDs(lockIDs...)
for tID := range tIDs {
tPrfl, err := tS.dm.GetThresholdProfile(ev.Tenant, tID, false, utils.NonTransactional)
if err != nil {
@@ -234,11 +238,13 @@ func (tS *ThresholdService) matchingThresholdsForEvent(ev *utils.CGREvent) (ts T
!tPrfl.ActivationInterval.IsActiveAtTime(time.Now()) { // not active
continue
}
if pass, err := tS.filterS.PassFiltersForEvent(ev.Tenant, ev.Event, tPrfl.FilterIDs); err != nil {
return nil, err
} else if !pass {
continue
}
t, err := tS.dm.GetThreshold(tPrfl.Tenant, tPrfl.ID, false, "")
if err != nil {
return nil, err