From 4f4ba6ed736618e047f117f567f2f426c7aeb03e Mon Sep 17 00:00:00 2001 From: TeoV Date: Wed, 11 Oct 2017 17:01:23 +0300 Subject: [PATCH] Add Async option for Thresholds --- engine/loader_csv_test.go | 68 ++++++++++++++++++++++++++++++++------- engine/model_helpers.go | 17 +++++++--- engine/models.go | 1 + engine/storage_map.go | 6 +--- engine/thresholds.go | 1 + engine/tp_reader.go | 15 +++++++-- utils/apitpdata.go | 1 + 7 files changed, 85 insertions(+), 24 deletions(-) diff --git a/engine/loader_csv_test.go b/engine/loader_csv_test.go index 1b18440c7..4452bd529 100755 --- a/engine/loader_csv_test.go +++ b/engine/loader_csv_test.go @@ -18,13 +18,12 @@ along with this program. If not, see package engine import ( + "github.com/cgrates/cgrates/cache" + "github.com/cgrates/cgrates/utils" "log" "reflect" "testing" "time" - - "github.com/cgrates/cgrates/cache" - "github.com/cgrates/cgrates/utils" ) var ( @@ -1509,8 +1508,52 @@ func TestLoadThresholdProfiles(t *testing.T) { } } +/* func TestLoadFilterProfiles(t *testing.T) { + eFilters := map[string]map[string]*utils.TPFilter{ + "cgrates.org": map[string]*utils.TPFilter{ + "FLTR_1": &utils.TPFilter{ + TPid: testTPID, + Tenant: "cgrates.org", + ID: "FLTR_1", + FilterType: "*string", + FilterFieldName: "Account", + FilterFielValues: []string{"1001", "1002"}, + }, + "FLTR_ACNT_dan": &utils.TPFilter{ + TPid: testTPID, + Tenant: "cgrates.org", + ID: "FLTR_ACNT_dan", + FilterType: "*string", + FilterFieldName: "Account", + FilterFielValues: []string{"dan"}, + }, + "FLTR_DST_NL": &utils.TPFilter{ + TPid: testTPID, + Tenant: "cgrates.org", + ID: "FLTR_DST_NL", + FilterType: "*destinations", + FilterFieldName: "Destination", + FilterFielValues: []string{"DST_NL"}, + }, + "FLTR_DST_DE": &utils.TPFilter{ + TPid: testTPID, + Tenant: "cgrates.org", + ID: "FLTR_DST_DE", + FilterType: "*destinations", + FilterFieldName: "Destination", + FilterFielValues: []string{"DST_DE"}, + }, + }, + } + if len(csvr.flProfiles["cgrates.org"]) != len(eFilters["cgrates.org"]) { + t.Errorf("Failed to load FilterProfiles: %s", utils.ToIJSON(csvr.flProfiles)) + } else if !reflect.DeepEqual(eFilters["cgrates.org"]["FLTR_1"], csvr.flProfiles["cgrates.org"]["FLTR_1"]) { + t.Errorf("Expecting: %+v, received: %+v", eFilters["cgrates.org"]["FLTR_1"], csvr.flProfiles["cgrates.org"]["FLTR_1"]) + } + } +*/ func TestLoadResource(t *testing.T) { eResources := []*utils.TenantID{ @@ -1561,28 +1604,29 @@ func TestLoadThresholds(t *testing.T) { } } -/* func TestLoadFilters(t *testing.T) { eFilters := []*utils.TenantID{ &utils.TenantID{ Tenant: "cgrates.org", - ID: "Threshold1", + ID: "FLTR_1", }, &utils.TenantID{ Tenant: "cgrates.org", - ID: "Threshold1", + ID: "FLTR_ACNT_dan", }, &utils.TenantID{ Tenant: "cgrates.org", - ID: "Threshold1", + ID: "FLTR_DST_DE", + }, + &utils.TenantID{ + Tenant: "cgrates.org", + ID: "FLTR_DST_NL", }, } if len(csvr.filters) != len(eFilters) { - t.Errorf("Failed to load thresholds: %s", utils.ToIJSON(csvr.thresholds)) - } else if !reflect.DeepEqual(eThresholds, csvr.thresholds) { - t.Errorf("Expecting: %+v, received: %+v", eFilters, csvr.thresholds) + t.Errorf("Failed to load filters: %s", utils.ToIJSON(csvr.filters)) + } else if !reflect.DeepEqual(eFilters, csvr.filters) { + t.Errorf("Expecting: %+v, received: %+v", eFilters, csvr.filters) } - } -*/ diff --git a/engine/model_helpers.go b/engine/model_helpers.go index 1e344e0fd..dbab90660 100755 --- a/engine/model_helpers.go +++ b/engine/model_helpers.go @@ -2138,6 +2138,7 @@ func (tps TpThresholdS) AsTPThreshold() (result []*utils.TPThreshold) { Blocker: tp.Blocker, Recurrent: tp.Recurrent, MinSleep: tp.MinSleep, + Async: tp.Async, } } if tp.ActionIDs != "" { @@ -2188,6 +2189,7 @@ func APItoModelTPThreshold(th *utils.TPThreshold) (mdls TpThresholdS) { mdl.Weight = th.Weight mdl.Recurrent = th.Recurrent mdl.MinSleep = th.MinSleep + mdl.Async = th.Async if th.ActivationInterval != nil { if th.ActivationInterval.ActivationTime != "" { mdl.ActivationInterval = th.ActivationInterval.ActivationTime @@ -2226,6 +2228,7 @@ func APItoThresholdProfile(tpTH *utils.TPThreshold, timezone string) (th *Thresh Recurrent: tpTH.Recurrent, Weight: tpTH.Weight, Blocker: tpTH.Blocker, + Async: tpTH.Async, Filters: make([]*Filter, len(tpTH.Filters)), } if tpTH.MinSleep != "" { @@ -2260,13 +2263,17 @@ func (tps TpFilterS) AsTPFilter() (result []*utils.TPFilter) { th, found := mst[tp.ID] if !found { th = &utils.TPFilter{ - TPid: tp.Tpid, - Tenant: tp.Tenant, - ID: tp.ID, - FilterType: tp.Type, - FilterFieldName: tp.Name, + TPid: tp.Tpid, + Tenant: tp.Tenant, + ID: tp.ID, } } + if tp.Type != "" { + th.FilterType = tp.Type + } + if tp.Name != "" { + th.FilterFieldName = tp.Name + } if tp.Values != "" { th.FilterFielValues = append(th.FilterFielValues, strings.Split(tp.Values, utils.INFIELD_SEP)...) } diff --git a/engine/models.go b/engine/models.go index 1f01820e9..82cbad039 100755 --- a/engine/models.go +++ b/engine/models.go @@ -514,6 +514,7 @@ type TpThreshold struct { Blocker bool `index:"8" re:""` Weight float64 `index:"9" re:"\d+\.?\d*"` ActionIDs string `index:"10" re:""` + Async bool `index:"8" re:""` CreatedAt time.Time } diff --git a/engine/storage_map.go b/engine/storage_map.go index 5e9b5c8f8..64aed54fd 100755 --- a/engine/storage_map.go +++ b/engine/storage_map.go @@ -317,11 +317,7 @@ func (ms *MapStorage) HasData(categ, subject string) (bool, error) { switch categ { case utils.DESTINATION_PREFIX, utils.RATING_PLAN_PREFIX, utils.RATING_PROFILE_PREFIX, utils.ACTION_PREFIX, utils.ACTION_PLAN_PREFIX, utils.ACCOUNT_PREFIX, utils.DERIVEDCHARGERS_PREFIX, -<<<<<<< HEAD - utils.ResourcesPrefix, utils.StatQueuePrefix, utils.ThresholdPrefix: -======= - utils.ResourcesPrefix, utils.StatQueuePrefix, utils.ThresholdProfilePrefix, utils.FilterPrefix: ->>>>>>> Add test in loader_csv_test.go for resources,statQueue,thresholds,filters + utils.ResourcesPrefix, utils.StatQueuePrefix, utils.ThresholdPrefix, utils.FilterPrefix: _, exists := ms.dict[categ+subject] return exists, nil } diff --git a/engine/thresholds.go b/engine/thresholds.go index ac962016d..41fa7c4e5 100644 --- a/engine/thresholds.go +++ b/engine/thresholds.go @@ -42,6 +42,7 @@ type ThresholdProfile struct { Blocker bool // blocker flag to stop processing on filters matched Weight float64 // Weight to sort the thresholds ActionIDs []string + Async bool } func (tp *ThresholdProfile) TenantID() string { diff --git a/engine/tp_reader.go b/engine/tp_reader.go index 2a0651ff9..f2f6588f5 100755 --- a/engine/tp_reader.go +++ b/engine/tp_reader.go @@ -2106,7 +2106,7 @@ func (tpr *TpReader) WriteToDatabase(flush, verbose, disable_reverse bool) (err } } if verbose { - log.Print("Filters:") + log.Print("FilterProfile:") } for _, mpID := range tpr.flProfiles { for _, tpTH := range mpID { @@ -2122,6 +2122,17 @@ func (tpr *TpReader) WriteToDatabase(flush, verbose, disable_reverse bool) (err } } } + if verbose { + log.Print("Filters:") + } + for _, thd := range tpr.filters { + if err = tpr.dataStorage.SetFilter(&Filter{Tenant: thd.Tenant, ID: thd.ID}); err != nil { + return err + } + if verbose { + log.Print("\t", thd.TenantID()) + } + } if verbose { log.Print("Timings:") } @@ -2297,7 +2308,7 @@ func (tpr *TpReader) ShowStatistics() { log.Print("Stats: ", len(tpr.sqProfiles)) // thresholds log.Print("Thresholds: ", len(tpr.thProfiles)) - // thresholds + // filters log.Print("Filters: ", len(tpr.flProfiles)) } diff --git a/utils/apitpdata.go b/utils/apitpdata.go index df349037c..91ac303dd 100755 --- a/utils/apitpdata.go +++ b/utils/apitpdata.go @@ -1366,6 +1366,7 @@ type TPThreshold struct { Blocker bool // blocker flag to stop processing on filters matched Weight float64 // Weight to sort the thresholds ActionIDs []string + Async bool } type TPFilter struct {