diff --git a/apier/v1/stats_it_test.go b/apier/v1/stats_it_test.go index 96512e44a..1dd9d4bd6 100644 --- a/apier/v1/stats_it_test.go +++ b/apier/v1/stats_it_test.go @@ -275,15 +275,9 @@ func testV1STSSetStatQueueProfile(t *testing.T) { t.Error(err) } statConfig = &engine.StatQueueProfile{ - Tenant: "cgrates.org", - ID: "TEST_PROFILE1", - Filters: []*engine.RequestFilter{ - &engine.RequestFilter{ - Type: "type", - FieldName: "Name", - Values: []string{"FilterValue1", "FilterValue2"}, - }, - }, + Tenant: "cgrates.org", + ID: "TEST_PROFILE1", + FilterIDs: []string{"FLTR_1"}, ActivationInterval: &utils.ActivationInterval{ ActivationTime: time.Date(2014, 7, 14, 14, 25, 0, 0, time.UTC).Local(), ExpiryTime: time.Date(2014, 7, 14, 14, 25, 0, 0, time.UTC).Local(), @@ -313,23 +307,7 @@ func testV1STSSetStatQueueProfile(t *testing.T) { func testV1STSUpdateStatQueueProfile(t *testing.T) { var result string - statConfig.Filters = []*engine.RequestFilter{ - &engine.RequestFilter{ - Type: "type", - FieldName: "Name", - Values: []string{"FilterValue1", "FilterValue2"}, - }, - &engine.RequestFilter{ - Type: "*string", - FieldName: "Accout", - Values: []string{"1001", "1002"}, - }, - &engine.RequestFilter{ - Type: "*string_prefix", - FieldName: "Destination", - Values: []string{"10", "20"}, - }, - } + statConfig.FilterIDs = []string{"FLTR_1", "FLTR_2"} if err := stsV1Rpc.Call("ApierV1.SetStatQueueProfile", statConfig, &result); err != nil { t.Error(err) } else if result != utils.OK { diff --git a/apier/v1/thresholds_it_test.go b/apier/v1/thresholds_it_test.go index 5704966cf..5d79d90e1 100644 --- a/apier/v1/thresholds_it_test.go +++ b/apier/v1/thresholds_it_test.go @@ -185,6 +185,7 @@ func testV1TSFromFolder(t *testing.T) { if err := tSv1Rpc.Call("ApierV1.LoadTariffPlanFromFolder", attrs, &reply); err != nil { t.Error(err) } + time.Sleep(500 * time.Millisecond) } func testV1TSGetThresholds(t *testing.T) { diff --git a/cmd/cgr-engine/cgr-engine.go b/cmd/cgr-engine/cgr-engine.go index 12473a0ab..2a2ba77b4 100644 --- a/cmd/cgr-engine/cgr-engine.go +++ b/cmd/cgr-engine/cgr-engine.go @@ -562,8 +562,10 @@ func startResourceService(internalRsChan, internalThresholdSChan chan rpcclient. // startStatService fires up the StatS func startStatService(internalStatSChan, internalThresholdSChan chan rpcclient.RpcClientConnection, cfg *config.CGRConfig, - dm *engine.DataManager, server *utils.Server, exitChan chan bool) { + dm *engine.DataManager, server *utils.Server, exitChan chan bool, filterSChan chan *engine.FilterS) { var thdSConn *rpcclient.RpcClientPool + filterS := <-filterSChan + filterSChan <- filterS if len(cfg.StatSCfg().ThresholdSConns) != 0 { // Stats connection init thdSConn, err = engine.NewRPCPool(rpcclient.POOL_FIRST, cfg.ConnectAttempts, cfg.Reconnects, cfg.ConnectTimeout, cfg.ReplyTimeout, cfg.StatSCfg().ThresholdSConns, internalThresholdSChan, cfg.InternalTtl) @@ -573,7 +575,7 @@ func startStatService(internalStatSChan, internalThresholdSChan chan rpcclient.R return } } - sS, err := engine.NewStatService(dm, cfg.StatSCfg().StoreInterval, thdSConn) + sS, err := engine.NewStatService(dm, cfg.StatSCfg().StoreInterval, thdSConn, filterS) if err != nil { utils.Logger.Crit(fmt.Sprintf(" Could not init, error: %s", err.Error())) exitChan <- true @@ -904,7 +906,7 @@ func main() { } if cfg.StatSCfg().Enabled { - go startStatService(internalStatSChan, internalThresholdSChan, cfg, dm, server, exitChan) + go startStatService(internalStatSChan, internalThresholdSChan, cfg, dm, server, exitChan, filterSChan) } if cfg.ThresholdSCfg().Enabled { diff --git a/data/storage/mysql/create_tariffplan_tables.sql b/data/storage/mysql/create_tariffplan_tables.sql index 9508de184..1475836ba 100644 --- a/data/storage/mysql/create_tariffplan_tables.sql +++ b/data/storage/mysql/create_tariffplan_tables.sql @@ -428,9 +428,7 @@ CREATE TABLE tp_stats ( `tpid` varchar(64) NOT NULL, `tenant` varchar(64) NOT NULL, `id` varchar(64) NOT NULL, - `filter_type` varchar(16) NOT NULL, - `filter_field_name` varchar(64) NOT NULL, - `filter_field_values` varchar(256) NOT NULL, + `filter_ids` varchar(64) NOT NULL, `activation_interval` varchar(64) NOT NULL, `queue_length` int(11) NOT NULL, `ttl` varchar(32) NOT NULL, @@ -443,7 +441,7 @@ CREATE TABLE tp_stats ( `created_at` TIMESTAMP, PRIMARY KEY (`pk`), KEY `tpid` (`tpid`), - UNIQUE KEY `unique_tp_stats` (`tpid`, `tenant`, `id`, `filter_type`, `filter_field_name`) + UNIQUE KEY `unique_tp_stats` (`tpid`, `tenant`, `id`, `filter_ids`) ); -- diff --git a/data/storage/postgres/create_tariffplan_tables.sql b/data/storage/postgres/create_tariffplan_tables.sql index a94d66bbe..2fb863c6d 100644 --- a/data/storage/postgres/create_tariffplan_tables.sql +++ b/data/storage/postgres/create_tariffplan_tables.sql @@ -424,9 +424,7 @@ CREATE TABLE tp_stats ( "tpid" varchar(64) NOT NULL, "tenant"varchar(64) NOT NULL, "id" varchar(64) NOT NULL, - "filter_type" varchar(16) NOT NULL, - "filter_field_name" varchar(64) NOT NULL, - "filter_field_values" varchar(256) NOT NULL, + "filter_ids" varchar(64) NOT NULL, "activation_interval" varchar(64) NOT NULL, "queue_length" INTEGER NOT NULL, "ttl" varchar(32) NOT NULL, @@ -439,7 +437,7 @@ CREATE TABLE tp_stats ( "created_at" TIMESTAMP WITH TIME ZONE ); CREATE INDEX tp_stats_idx ON tp_stats (tpid); -CREATE INDEX tp_stats_unique ON tp_stats ("tpid","tenant", "id", "filter_type", "filter_field_name"); +CREATE INDEX tp_stats_unique ON tp_stats ("tpid","tenant", "id", "filter_ids"); -- -- Table structure for table `tp_threshold_cfgs` diff --git a/data/tariffplans/testtp/Stats.csv b/data/tariffplans/testtp/Stats.csv index 6b5853d14..3fcbc13ef 100755 --- a/data/tariffplans/testtp/Stats.csv +++ b/data/tariffplans/testtp/Stats.csv @@ -1,2 +1,2 @@ -#Tenant[0],Id[1],FilterType[2],FilterFieldName[3],FilterFieldValues[4],ActivationInterval[5],QueueLength[6],TTL[7],Metrics[8],Blocker[9],Stored[10],Weight[11],MinItems[12],Thresholds[13] -cgrates.org,Stats1,*string,Account,1001;1002,2014-07-29T15:00:00Z,100,1s,*asr;*acc;*tcc;*acd;*tcd;*pdd,true,true,20,2,THRESH1;THRESH2 +#Tenant[0],Id[1],FilterIDs[2],ActivationInterval[3],QueueLength[4],TTL[5],Metrics[6],Blocker[7],Stored[8],Weight[9],MinItems[10],Thresholds[11] +cgrates.org,Stats1,FLTR_1,2014-07-29T15:00:00Z,100,1s,*asr;*acc;*tcc;*acd;*tcd;*pdd,true,true,20,2,THRESH1;THRESH2 diff --git a/data/tariffplans/tutorial/Stats.csv b/data/tariffplans/tutorial/Stats.csv index 6b5853d14..3fcbc13ef 100755 --- a/data/tariffplans/tutorial/Stats.csv +++ b/data/tariffplans/tutorial/Stats.csv @@ -1,2 +1,2 @@ -#Tenant[0],Id[1],FilterType[2],FilterFieldName[3],FilterFieldValues[4],ActivationInterval[5],QueueLength[6],TTL[7],Metrics[8],Blocker[9],Stored[10],Weight[11],MinItems[12],Thresholds[13] -cgrates.org,Stats1,*string,Account,1001;1002,2014-07-29T15:00:00Z,100,1s,*asr;*acc;*tcc;*acd;*tcd;*pdd,true,true,20,2,THRESH1;THRESH2 +#Tenant[0],Id[1],FilterIDs[2],ActivationInterval[3],QueueLength[4],TTL[5],Metrics[6],Blocker[7],Stored[8],Weight[9],MinItems[10],Thresholds[11] +cgrates.org,Stats1,FLTR_1,2014-07-29T15:00:00Z,100,1s,*asr;*acc;*tcc;*acd;*tcd;*pdd,true,true,20,2,THRESH1;THRESH2 diff --git a/engine/libstats.go b/engine/libstats.go index 989f9f059..8a94e5b9a 100755 --- a/engine/libstats.go +++ b/engine/libstats.go @@ -31,7 +31,7 @@ import ( type StatQueueProfile struct { Tenant string ID string // QueueID - Filters []*RequestFilter + FilterIDs []string ActivationInterval *utils.ActivationInterval // Activation interval QueueLength int TTL time.Duration diff --git a/engine/loader_csv_test.go b/engine/loader_csv_test.go index a43ce2350..8554e2ffc 100755 --- a/engine/loader_csv_test.go +++ b/engine/loader_csv_test.go @@ -272,8 +272,8 @@ cgrates.org,ResGroup21,*rsr_fields,,HdrSubject(~^1.*1$);HdrDestination(1002),,,, cgrates.org,ResGroup22,*destinations,HdrDestination,DST_FS,2014-07-29T15:00:00Z,3600s,2,premium_call,true,true,10, ` stats = ` -#Tenant[0],Id[1],FilterType[2],FilterFieldName[3],FilterFieldValues[4],ActivationInterval[5],QueueLength[6],TTL[7],Metrics[8],Blocker[9],Stored[10],Weight[11],MinItems[12],Thresholds[13] -cgrates.org,Stats1,*string,Account,1001;1002,2014-07-29T15:00:00Z,100,1s,*asr;*acc;*tcc;*acd;*tcd;*pdd,true,true,20,2,THRESH1;THRESH2 +#Tenant[0],Id[1],FilterIDs[2],ActivationInterval[3],QueueLength[4],TTL[5],Metrics[6],Blocker[7],Stored[8],Weight[9],MinItems[10],Thresholds[11] +cgrates.org,Stats1,FLTR_1,2014-07-29T15:00:00Z,100,1s,*asr;*acc;*tcc;*acd;*tcd;*pdd,true,true,20,2,THRESH1;THRESH2 ` thresholds = ` @@ -1449,12 +1449,10 @@ func TestLoadResourceProfiles(t *testing.T) { func TestLoadStatProfiles(t *testing.T) { eStats := map[utils.TenantID]*utils.TPStats{ utils.TenantID{Tenant: "cgrates.org", ID: "Stats1"}: &utils.TPStats{ - Tenant: "cgrates.org", - TPid: testTPID, - ID: "Stats1", - Filters: []*utils.TPRequestFilter{ - &utils.TPRequestFilter{Type: MetaString, FieldName: "Account", Values: []string{"1001", "1002"}}, - }, + Tenant: "cgrates.org", + TPid: testTPID, + ID: "Stats1", + FilterIDs: []string{"FLTR_1"}, ActivationInterval: &utils.TPActivationInterval{ ActivationTime: "2014-07-29T15:00:00Z", }, diff --git a/engine/model_helpers.go b/engine/model_helpers.go index 78c100d71..6a6930b4f 100755 --- a/engine/model_helpers.go +++ b/engine/model_helpers.go @@ -2016,12 +2016,13 @@ func (tps TpStatsS) AsTPStats() (result []*utils.TPStats) { st.ActivationInterval.ActivationTime = aiSplt[0] } } - if tp.FilterType != "" { - st.Filters = append(st.Filters, &utils.TPRequestFilter{ - Type: tp.FilterType, - FieldName: tp.FilterFieldName, - Values: strings.Split(tp.FilterFieldValues, utils.INFIELD_SEP)}) + if tp.FilterIDs != "" { + filterSplit := strings.Split(tp.FilterIDs, utils.INFIELD_SEP) + for _, filter := range filterSplit { + st.FilterIDs = append(st.FilterIDs, filter) + } } + mst[tp.ID] = st } result = make([]*utils.TPStats, len(mst)) @@ -2034,53 +2035,45 @@ func (tps TpStatsS) AsTPStats() (result []*utils.TPStats) { } func APItoModelStats(st *utils.TPStats) (mdls TpStatsS) { - if len(st.Filters) == 0 { - return - } - for i, fltr := range st.Filters { - mdl := &TpStats{ - Tenant: st.Tenant, - Tpid: st.TPid, - ID: st.ID, - MinItems: st.MinItems, + if st != nil { + for i, fltr := range st.FilterIDs { + mdl := &TpStats{ + Tenant: st.Tenant, + Tpid: st.TPid, + ID: st.ID, + MinItems: st.MinItems, + } + if i == 0 { + mdl.TTL = st.TTL + mdl.Blocker = st.Blocker + mdl.Stored = st.Stored + mdl.Weight = st.Weight + mdl.QueueLength = st.QueueLength + mdl.MinItems = st.MinItems + for i, val := range st.Metrics { + if i != 0 { + mdl.Metrics += utils.INFIELD_SEP + } + mdl.Metrics += val + } + for i, val := range st.Thresholds { + if i != 0 { + mdl.Thresholds += utils.INFIELD_SEP + } + mdl.Thresholds += val + } + if st.ActivationInterval != nil { + if st.ActivationInterval.ActivationTime != "" { + mdl.ActivationInterval = st.ActivationInterval.ActivationTime + } + if st.ActivationInterval.ExpiryTime != "" { + mdl.ActivationInterval += utils.INFIELD_SEP + st.ActivationInterval.ExpiryTime + } + } + } + mdl.FilterIDs = fltr + mdls = append(mdls, mdl) } - if i == 0 { - mdl.TTL = st.TTL - mdl.Blocker = st.Blocker - mdl.Stored = st.Stored - mdl.Weight = st.Weight - mdl.QueueLength = st.QueueLength - mdl.MinItems = st.MinItems - for i, val := range st.Metrics { - if i != 0 { - mdl.Metrics += utils.INFIELD_SEP - } - mdl.Metrics += val - } - for i, val := range st.Thresholds { - if i != 0 { - mdl.Thresholds += utils.INFIELD_SEP - } - mdl.Thresholds += val - } - if st.ActivationInterval != nil { - if st.ActivationInterval.ActivationTime != "" { - mdl.ActivationInterval = st.ActivationInterval.ActivationTime - } - if st.ActivationInterval.ExpiryTime != "" { - mdl.ActivationInterval += utils.INFIELD_SEP + st.ActivationInterval.ExpiryTime - } - } - } - mdl.FilterType = fltr.Type - mdl.FilterFieldName = fltr.FieldName - for i, val := range fltr.Values { - if i != 0 { - mdl.FilterFieldValues += utils.INFIELD_SEP - } - mdl.FilterFieldValues += val - } - mdls = append(mdls, mdl) } return } @@ -2094,7 +2087,6 @@ func APItoStats(tpST *utils.TPStats, timezone string) (st *StatQueueProfile, err Blocker: tpST.Blocker, Stored: tpST.Stored, MinItems: tpST.MinItems, - Filters: make([]*RequestFilter, len(tpST.Filters)), } if tpST.TTL != "" { if st.TTL, err = utils.ParseDurationWithSecs(tpST.TTL); err != nil { @@ -2108,12 +2100,8 @@ func APItoStats(tpST *utils.TPStats, timezone string) (st *StatQueueProfile, err st.Thresholds = append(st.Thresholds, trh) } - for i, f := range tpST.Filters { - rf := &RequestFilter{Type: f.Type, FieldName: f.FieldName, Values: f.Values} - if err := rf.CompileValues(); err != nil { - return nil, err - } - st.Filters[i] = rf + for _, fltr := range tpST.FilterIDs { + st.FilterIDs = append(st.FilterIDs, fltr) } if tpST.ActivationInterval != nil { if st.ActivationInterval, err = tpST.ActivationInterval.AsActivationInterval(timezone); err != nil { diff --git a/engine/model_helpers_test.go b/engine/model_helpers_test.go index 9a0b4f053..4b9d5da0e 100755 --- a/engine/model_helpers_test.go +++ b/engine/model_helpers_test.go @@ -847,9 +847,7 @@ func TestTPStatsAsTPStats(t *testing.T) { &TpStats{ Tpid: "TEST_TPID", ID: "Stats1", - FilterType: MetaStringPrefix, - FilterFieldName: "Account", - FilterFieldValues: "1001;1002", + FilterIDs: "FLTR_1", ActivationInterval: "2014-07-29T15:00:00Z", QueueLength: 100, TTL: "1s", @@ -863,15 +861,9 @@ func TestTPStatsAsTPStats(t *testing.T) { } eTPs := []*utils.TPStats{ &utils.TPStats{ - TPid: tps[0].Tpid, - ID: tps[0].ID, - Filters: []*utils.TPRequestFilter{ - &utils.TPRequestFilter{ - Type: tps[0].FilterType, - FieldName: tps[0].FilterFieldName, - Values: []string{"1001", "1002"}, - }, - }, + TPid: tps[0].Tpid, + ID: tps[0].ID, + FilterIDs: []string{"FLTR_1"}, ActivationInterval: &utils.TPActivationInterval{ ActivationTime: tps[0].ActivationInterval, }, @@ -893,11 +885,9 @@ func TestTPStatsAsTPStats(t *testing.T) { func TestAPItoTPStats(t *testing.T) { tps := &utils.TPStats{ - TPid: testTPID, - ID: "Stats1", - Filters: []*utils.TPRequestFilter{ - &utils.TPRequestFilter{Type: MetaString, FieldName: "Account", Values: []string{"1001", "1002"}}, - }, + TPid: testTPID, + ID: "Stats1", + FilterIDs: []string{"FLTR_1"}, ActivationInterval: &utils.TPActivationInterval{ActivationTime: "2014-07-29T15:00:00Z"}, QueueLength: 100, TTL: "1s", @@ -913,7 +903,7 @@ func TestAPItoTPStats(t *testing.T) { QueueLength: tps.QueueLength, Metrics: []string{"*asr", "*acd", "*acc"}, Thresholds: []string{"THRESH1", "THRESH2"}, - Filters: make([]*RequestFilter, len(tps.Filters)), + FilterIDs: []string{"FLTR_1"}, Stored: tps.Stored, Blocker: tps.Blocker, Weight: 20.0, @@ -922,9 +912,6 @@ func TestAPItoTPStats(t *testing.T) { if eTPs.TTL, err = utils.ParseDurationWithSecs(tps.TTL); err != nil { t.Errorf("Got error: %+v", err) } - - eTPs.Filters[0] = &RequestFilter{Type: MetaString, - FieldName: "Account", Values: []string{"1001", "1002"}} at, _ := utils.ParseTimeDetectLayout("2014-07-29T15:00:00Z", "UTC") eTPs.ActivationInterval = &utils.ActivationInterval{ActivationTime: at} diff --git a/engine/models.go b/engine/models.go index abe6e3c25..20e01e962 100755 --- a/engine/models.go +++ b/engine/models.go @@ -485,18 +485,16 @@ type TpStats struct { Tpid string Tenant string `index:"0" re:""` ID string `index:"1" re:""` - FilterType string `index:"2" re:"^\*[A-Za-z].*"` - FilterFieldName string `index:"3" re:""` - FilterFieldValues string `index:"4" re:""` - ActivationInterval string `index:"5" re:""` - QueueLength int `index:"6" re:""` - TTL string `index:"7" re:""` - Metrics string `index:"8" re:""` - Blocker bool `index:"9" re:""` - Stored bool `index:"10" re:""` - Weight float64 `index:"11" re:"\d+\.?\d*"` - MinItems int `index:"12" re:""` - Thresholds string `index:"13" re:""` + FilterIDs string `index:"2" re:""` + ActivationInterval string `index:"3" re:""` + QueueLength int `index:"4" re:""` + TTL string `index:"5" re:""` + Metrics string `index:"6" re:""` + Blocker bool `index:"7" re:""` + Stored bool `index:"8" re:""` + Weight float64 `index:"9" re:"\d+\.?\d*"` + MinItems int `index:"10" re:""` + Thresholds string `index:"11" re:""` CreatedAt time.Time } diff --git a/engine/onstor_it_test.go b/engine/onstor_it_test.go index c979df54b..a2f5daa30 100644 --- a/engine/onstor_it_test.go +++ b/engine/onstor_it_test.go @@ -1967,7 +1967,7 @@ func testOnStorITCRUDStatQueueProfile(t *testing.T) { sq := &StatQueueProfile{ ID: "test", ActivationInterval: &utils.ActivationInterval{}, - Filters: []*RequestFilter{}, + FilterIDs: []string{}, QueueLength: 2, TTL: timeTTL, Metrics: []string{}, diff --git a/engine/stats.go b/engine/stats.go index c0d35406a..dbe25b9ea 100644 --- a/engine/stats.go +++ b/engine/stats.go @@ -34,7 +34,7 @@ import ( // NewStatService initializes a StatService func NewStatService(dm *DataManager, storeInterval time.Duration, - thdS rpcclient.RpcClientConnection) (ss *StatService, err error) { + thdS rpcclient.RpcClientConnection, filterS *FilterS) (ss *StatService, err error) { if thdS != nil && reflect.ValueOf(thdS).IsNil() { // fix nil value in interface thdS = nil } @@ -42,6 +42,7 @@ func NewStatService(dm *DataManager, storeInterval time.Duration, dm: dm, storeInterval: storeInterval, thdS: thdS, + filterS: filterS, storedStatQueues: make(utils.StringMap), stopBackup: make(chan struct{})}, nil } @@ -51,6 +52,7 @@ type StatService struct { dm *DataManager storeInterval time.Duration thdS rpcclient.RpcClientConnection // rpc connection towards ThresholdS + filterS *FilterS stopBackup chan struct{} storedStatQueues utils.StringMap // keep a record of stats which need saving, map[statsTenantID]bool ssqMux sync.RWMutex // protects storedStatQueues @@ -156,16 +158,9 @@ func (sS *StatService) matchingStatQueuesForEvent(ev *StatEvent) (sqs StatQueues !sqPrfl.ActivationInterval.IsActiveAtTime(time.Now()) { // not active continue } - passAllFilters := true - for _, fltr := range sqPrfl.Filters { - if pass, err := fltr.Pass(ev.Event, "", sS); err != nil { - return nil, err - } else if !pass { - passAllFilters = false - continue - } - } - if !passAllFilters { + if pass, err := sS.filterS.PassFiltersForEvent(ev.Tenant, ev.Event, sqPrfl.FilterIDs); err != nil { + return nil, err + } else if !pass { continue } s, err := sS.dm.GetStatQueue(sqPrfl.Tenant, sqPrfl.ID, false, "") diff --git a/engine/storage_map.go b/engine/storage_map.go index 538a4079e..c902afdd3 100755 --- a/engine/storage_map.go +++ b/engine/storage_map.go @@ -1222,11 +1222,6 @@ func (ms *MapStorage) GetStatQueueProfileDrv(tenant string, id string) (sq *Stat if err != nil { return nil, err } - for _, fltr := range sq.Filters { - if err := fltr.CompileValues(); err != nil { - return nil, err - } - } return } diff --git a/engine/storage_mongo_datadb.go b/engine/storage_mongo_datadb.go index f8f9ef092..22d47a9d5 100755 --- a/engine/storage_mongo_datadb.go +++ b/engine/storage_mongo_datadb.go @@ -1824,11 +1824,6 @@ func (ms *MongoStorage) GetStatQueueProfileDrv(tenant string, id string) (sq *St } return nil, err } - for _, fltr := range sq.Filters { - if err = fltr.CompileValues(); err != nil { - return - } - } return } diff --git a/engine/storage_redis.go b/engine/storage_redis.go index a8953cc1d..b3138d5c7 100755 --- a/engine/storage_redis.go +++ b/engine/storage_redis.go @@ -1374,11 +1374,6 @@ func (rs *RedisStorage) GetStatQueueProfileDrv(tenant string, id string) (sq *St if err = rs.ms.Unmarshal(values, &sq); err != nil { return } - for _, fltr := range sq.Filters { - if err = fltr.CompileValues(); err != nil { - return - } - } return } diff --git a/engine/stordb_it_test.go b/engine/stordb_it_test.go index 5488b3888..be34cb3a5 100755 --- a/engine/stordb_it_test.go +++ b/engine/stordb_it_test.go @@ -1560,16 +1560,10 @@ func testStorDBitCRUDTpStats(t *testing.T) { //WRITE eTPs := []*utils.TPStats{ &utils.TPStats{ - TPid: "TEST_TPID", - Tenant: "Test", - ID: "Stats1", - Filters: []*utils.TPRequestFilter{ - &utils.TPRequestFilter{ - Type: "filtertype", - FieldName: "Account", - Values: []string{"1001", "1002"}, - }, - }, + TPid: "TEST_TPID", + Tenant: "Test", + ID: "Stats1", + FilterIDs: []string{"FLTR_1"}, ActivationInterval: &utils.TPActivationInterval{ ActivationTime: "2014-07-29T15:00:00Z", }, @@ -1601,11 +1595,7 @@ func testStorDBitCRUDTpStats(t *testing.T) { if !(reflect.DeepEqual(eTPs[0].Weight, rcv[0].Weight) || reflect.DeepEqual(eTPs[0].Weight, rcv[1].Weight)) { t.Errorf("Expecting: %+v, received: %+v || %+v", eTPs[0].Weight, rcv[0].Weight, rcv[1].Weight) } - for i, _ := range eTPs[0].Filters { - if !(reflect.DeepEqual(eTPs[0].Filters[i], rcv[0].Filters[i]) || reflect.DeepEqual(eTPs[0].Filters[i], rcv[1].Filters[i])) { - t.Errorf("Expecting: %+v, received: %+v || %+v", eTPs[0].Filters[i], rcv[0].Filters[i], rcv[1].Filters[i]) - } - } + } // UPDATE eTPs[0].Weight = 2.1 diff --git a/engine/tp_reader.go b/engine/tp_reader.go index 857179312..a5aeb62da 100755 --- a/engine/tp_reader.go +++ b/engine/tp_reader.go @@ -64,6 +64,7 @@ type TpReader struct { revAliases, acntActionPlans map[string][]string thdsIndexers map[string]*ReqFilterIndexer // tenant, indexer + sqpIndexers map[string]*ReqFilterIndexer // tenant, indexer } func NewTpReader(db DataDB, lr LoadReader, tpid, timezone string) *TpReader { @@ -139,6 +140,7 @@ func (tpr *TpReader) Init() { tpr.revAliases = make(map[string][]string) tpr.acntActionPlans = make(map[string][]string) tpr.thdsIndexers = make(map[string]*ReqFilterIndexer) + tpr.sqpIndexers = make(map[string]*ReqFilterIndexer) } func (tpr *TpReader) LoadDestinationsFiltered(tag string) (bool, error) { @@ -1622,7 +1624,7 @@ func (tpr *TpReader) LoadResourceProfiles() error { return tpr.LoadResourceProfilesFiltered("") } -func (tpr *TpReader) LoadStatsFiltered(tag string) error { +func (tpr *TpReader) LoadStatsFiltered(tag string) (err error) { tps, err := tpr.lr.GetTPStats(tpr.tpid, tag) if err != nil { return err @@ -1632,12 +1634,35 @@ func (tpr *TpReader) LoadStatsFiltered(tag string) error { mapSTs[utils.TenantID{Tenant: st.Tenant, ID: st.ID}] = st } tpr.sqProfiles = mapSTs - for tenantid, _ := range mapSTs { + for tenantid, sq := range mapSTs { + sqpIndxrKey := utils.StatQueuesStringIndex + tenantid.TenantID() if has, err := tpr.dm.DataDB().HasData(utils.StatQueuePrefix, tenantid.TenantID()); err != nil { return err } else if !has { tpr.statQueues = append(tpr.statQueues, &utils.TenantID{Tenant: tenantid.Tenant, ID: tenantid.ID}) } + // index statQueues for filters + if _, has := tpr.sqpIndexers[tenantid.TenantID()]; !has { + if tpr.sqpIndexers[tenantid.TenantID()], err = NewReqFilterIndexer(tpr.dm, sqpIndxrKey); err != nil { + return + } + } + for _, fltrID := range sq.FilterIDs { + tpFltr, has := tpr.filters[utils.TenantID{Tenant: tenantid.Tenant, ID: fltrID}] + if !has { + var fltr *Filter + if fltr, err = tpr.dm.GetFilter(tenantid.Tenant, fltrID, false, utils.NonTransactional); err != nil { + if err == utils.ErrNotFound { + err = fmt.Errorf("broken reference to filter: %s for statQueue: %s", fltrID, sq) + } + return + } else { + tpFltr = FilterToTPFilter(fltr) + } + } else { + tpr.sqpIndexers[tenantid.TenantID()].IndexTPFilter(tpFltr, sq.ID) + } + } } return nil } @@ -1653,7 +1678,7 @@ func (tpr *TpReader) LoadThresholdsFiltered(tag string) (err error) { } mapTHs := make(map[utils.TenantID]*utils.TPThreshold) for _, th := range tps { - mapTHs[utils.TenantID{th.Tenant, th.ID}] = th + mapTHs[utils.TenantID{Tenant: th.Tenant, ID: th.ID}] = th } tpr.thProfiles = mapTHs for tntID, th := range mapTHs { @@ -1700,7 +1725,7 @@ func (tpr *TpReader) LoadFiltersFiltered(tag string) error { } mapTHs := make(map[utils.TenantID]*utils.TPFilter) for _, th := range tps { - mapTHs[utils.TenantID{th.Tenant, th.ID}] = th + mapTHs[utils.TenantID{Tenant: th.Tenant, ID: th.ID}] = th } tpr.filters = mapTHs return nil @@ -2163,28 +2188,19 @@ func (tpr *TpReader) WriteToDatabase(flush, verbose, disable_reverse bool) (err } } } - if len(tpr.sqProfiles) > 0 { - if verbose { - log.Print("Indexing stats") + + if verbose { + log.Print("StatQueue filter indexes:") + } + for tenant, fltrIdxer := range tpr.sqpIndexers { + if err := fltrIdxer.StoreIndexes(); err != nil { + return err } - for tenantid, st := range tpr.sqProfiles { - stIdxr, err := NewReqFilterIndexer(tpr.dm, utils.StatQueuesStringIndex+tenantid.Tenant) - if err != nil { - return err - } - if st, err := APItoStats(st, tpr.timezone); err != nil { - return err - } else { - stIdxr.IndexFilters(st.ID, st.Filters) - } - if verbose { - log.Printf("Indexed Stats tenant: %s, keys %+v", tenantid.Tenant, stIdxr.ChangedKeys().Slice()) - } - if err := stIdxr.StoreIndexes(); err != nil { - return err - } + if verbose { + log.Printf("Tenant: %s, keys %+v", tenant, fltrIdxer.ChangedKeys().Slice()) } } + if verbose { log.Print("Threshold filter indexes:") } diff --git a/utils/apitpdata.go b/utils/apitpdata.go index e8bf330ac..a43de75be 100755 --- a/utils/apitpdata.go +++ b/utils/apitpdata.go @@ -1343,7 +1343,7 @@ type TPStats struct { TPid string Tenant string ID string - Filters []*TPRequestFilter + FilterIDs []string ActivationInterval *TPActivationInterval QueueLength int TTL string