mirror of
https://github.com/cgrates/cgrates.git
synced 2026-02-19 22:28:45 +05:00
Add FilterIDs in StatQueue and FilterS in StatS
This commit is contained in:
committed by
Dan Christian Bogos
parent
4e3b45f54a
commit
c16ce4ba29
@@ -275,15 +275,9 @@ func testV1STSSetStatQueueProfile(t *testing.T) {
|
||||
t.Error(err)
|
||||
}
|
||||
statConfig = &engine.StatQueueProfile{
|
||||
Tenant: "cgrates.org",
|
||||
ID: "TEST_PROFILE1",
|
||||
Filters: []*engine.RequestFilter{
|
||||
&engine.RequestFilter{
|
||||
Type: "type",
|
||||
FieldName: "Name",
|
||||
Values: []string{"FilterValue1", "FilterValue2"},
|
||||
},
|
||||
},
|
||||
Tenant: "cgrates.org",
|
||||
ID: "TEST_PROFILE1",
|
||||
FilterIDs: []string{"FLTR_1"},
|
||||
ActivationInterval: &utils.ActivationInterval{
|
||||
ActivationTime: time.Date(2014, 7, 14, 14, 25, 0, 0, time.UTC).Local(),
|
||||
ExpiryTime: time.Date(2014, 7, 14, 14, 25, 0, 0, time.UTC).Local(),
|
||||
@@ -313,23 +307,7 @@ func testV1STSSetStatQueueProfile(t *testing.T) {
|
||||
|
||||
func testV1STSUpdateStatQueueProfile(t *testing.T) {
|
||||
var result string
|
||||
statConfig.Filters = []*engine.RequestFilter{
|
||||
&engine.RequestFilter{
|
||||
Type: "type",
|
||||
FieldName: "Name",
|
||||
Values: []string{"FilterValue1", "FilterValue2"},
|
||||
},
|
||||
&engine.RequestFilter{
|
||||
Type: "*string",
|
||||
FieldName: "Accout",
|
||||
Values: []string{"1001", "1002"},
|
||||
},
|
||||
&engine.RequestFilter{
|
||||
Type: "*string_prefix",
|
||||
FieldName: "Destination",
|
||||
Values: []string{"10", "20"},
|
||||
},
|
||||
}
|
||||
statConfig.FilterIDs = []string{"FLTR_1", "FLTR_2"}
|
||||
if err := stsV1Rpc.Call("ApierV1.SetStatQueueProfile", statConfig, &result); err != nil {
|
||||
t.Error(err)
|
||||
} else if result != utils.OK {
|
||||
|
||||
@@ -185,6 +185,7 @@ func testV1TSFromFolder(t *testing.T) {
|
||||
if err := tSv1Rpc.Call("ApierV1.LoadTariffPlanFromFolder", attrs, &reply); err != nil {
|
||||
t.Error(err)
|
||||
}
|
||||
time.Sleep(500 * time.Millisecond)
|
||||
}
|
||||
|
||||
func testV1TSGetThresholds(t *testing.T) {
|
||||
|
||||
@@ -562,8 +562,10 @@ func startResourceService(internalRsChan, internalThresholdSChan chan rpcclient.
|
||||
|
||||
// startStatService fires up the StatS
|
||||
func startStatService(internalStatSChan, internalThresholdSChan chan rpcclient.RpcClientConnection, cfg *config.CGRConfig,
|
||||
dm *engine.DataManager, server *utils.Server, exitChan chan bool) {
|
||||
dm *engine.DataManager, server *utils.Server, exitChan chan bool, filterSChan chan *engine.FilterS) {
|
||||
var thdSConn *rpcclient.RpcClientPool
|
||||
filterS := <-filterSChan
|
||||
filterSChan <- filterS
|
||||
if len(cfg.StatSCfg().ThresholdSConns) != 0 { // Stats connection init
|
||||
thdSConn, err = engine.NewRPCPool(rpcclient.POOL_FIRST, cfg.ConnectAttempts, cfg.Reconnects, cfg.ConnectTimeout, cfg.ReplyTimeout,
|
||||
cfg.StatSCfg().ThresholdSConns, internalThresholdSChan, cfg.InternalTtl)
|
||||
@@ -573,7 +575,7 @@ func startStatService(internalStatSChan, internalThresholdSChan chan rpcclient.R
|
||||
return
|
||||
}
|
||||
}
|
||||
sS, err := engine.NewStatService(dm, cfg.StatSCfg().StoreInterval, thdSConn)
|
||||
sS, err := engine.NewStatService(dm, cfg.StatSCfg().StoreInterval, thdSConn, filterS)
|
||||
if err != nil {
|
||||
utils.Logger.Crit(fmt.Sprintf("<StatS> Could not init, error: %s", err.Error()))
|
||||
exitChan <- true
|
||||
@@ -904,7 +906,7 @@ func main() {
|
||||
}
|
||||
|
||||
if cfg.StatSCfg().Enabled {
|
||||
go startStatService(internalStatSChan, internalThresholdSChan, cfg, dm, server, exitChan)
|
||||
go startStatService(internalStatSChan, internalThresholdSChan, cfg, dm, server, exitChan, filterSChan)
|
||||
}
|
||||
|
||||
if cfg.ThresholdSCfg().Enabled {
|
||||
|
||||
@@ -428,9 +428,7 @@ CREATE TABLE tp_stats (
|
||||
`tpid` varchar(64) NOT NULL,
|
||||
`tenant` varchar(64) NOT NULL,
|
||||
`id` varchar(64) NOT NULL,
|
||||
`filter_type` varchar(16) NOT NULL,
|
||||
`filter_field_name` varchar(64) NOT NULL,
|
||||
`filter_field_values` varchar(256) NOT NULL,
|
||||
`filter_ids` varchar(64) NOT NULL,
|
||||
`activation_interval` varchar(64) NOT NULL,
|
||||
`queue_length` int(11) NOT NULL,
|
||||
`ttl` varchar(32) NOT NULL,
|
||||
@@ -443,7 +441,7 @@ CREATE TABLE tp_stats (
|
||||
`created_at` TIMESTAMP,
|
||||
PRIMARY KEY (`pk`),
|
||||
KEY `tpid` (`tpid`),
|
||||
UNIQUE KEY `unique_tp_stats` (`tpid`, `tenant`, `id`, `filter_type`, `filter_field_name`)
|
||||
UNIQUE KEY `unique_tp_stats` (`tpid`, `tenant`, `id`, `filter_ids`)
|
||||
);
|
||||
|
||||
--
|
||||
|
||||
@@ -424,9 +424,7 @@ CREATE TABLE tp_stats (
|
||||
"tpid" varchar(64) NOT NULL,
|
||||
"tenant"varchar(64) NOT NULL,
|
||||
"id" varchar(64) NOT NULL,
|
||||
"filter_type" varchar(16) NOT NULL,
|
||||
"filter_field_name" varchar(64) NOT NULL,
|
||||
"filter_field_values" varchar(256) NOT NULL,
|
||||
"filter_ids" varchar(64) NOT NULL,
|
||||
"activation_interval" varchar(64) NOT NULL,
|
||||
"queue_length" INTEGER NOT NULL,
|
||||
"ttl" varchar(32) NOT NULL,
|
||||
@@ -439,7 +437,7 @@ CREATE TABLE tp_stats (
|
||||
"created_at" TIMESTAMP WITH TIME ZONE
|
||||
);
|
||||
CREATE INDEX tp_stats_idx ON tp_stats (tpid);
|
||||
CREATE INDEX tp_stats_unique ON tp_stats ("tpid","tenant", "id", "filter_type", "filter_field_name");
|
||||
CREATE INDEX tp_stats_unique ON tp_stats ("tpid","tenant", "id", "filter_ids");
|
||||
|
||||
--
|
||||
-- Table structure for table `tp_threshold_cfgs`
|
||||
|
||||
@@ -1,2 +1,2 @@
|
||||
#Tenant[0],Id[1],FilterType[2],FilterFieldName[3],FilterFieldValues[4],ActivationInterval[5],QueueLength[6],TTL[7],Metrics[8],Blocker[9],Stored[10],Weight[11],MinItems[12],Thresholds[13]
|
||||
cgrates.org,Stats1,*string,Account,1001;1002,2014-07-29T15:00:00Z,100,1s,*asr;*acc;*tcc;*acd;*tcd;*pdd,true,true,20,2,THRESH1;THRESH2
|
||||
#Tenant[0],Id[1],FilterIDs[2],ActivationInterval[3],QueueLength[4],TTL[5],Metrics[6],Blocker[7],Stored[8],Weight[9],MinItems[10],Thresholds[11]
|
||||
cgrates.org,Stats1,FLTR_1,2014-07-29T15:00:00Z,100,1s,*asr;*acc;*tcc;*acd;*tcd;*pdd,true,true,20,2,THRESH1;THRESH2
|
||||
|
||||
|
@@ -1,2 +1,2 @@
|
||||
#Tenant[0],Id[1],FilterType[2],FilterFieldName[3],FilterFieldValues[4],ActivationInterval[5],QueueLength[6],TTL[7],Metrics[8],Blocker[9],Stored[10],Weight[11],MinItems[12],Thresholds[13]
|
||||
cgrates.org,Stats1,*string,Account,1001;1002,2014-07-29T15:00:00Z,100,1s,*asr;*acc;*tcc;*acd;*tcd;*pdd,true,true,20,2,THRESH1;THRESH2
|
||||
#Tenant[0],Id[1],FilterIDs[2],ActivationInterval[3],QueueLength[4],TTL[5],Metrics[6],Blocker[7],Stored[8],Weight[9],MinItems[10],Thresholds[11]
|
||||
cgrates.org,Stats1,FLTR_1,2014-07-29T15:00:00Z,100,1s,*asr;*acc;*tcc;*acd;*tcd;*pdd,true,true,20,2,THRESH1;THRESH2
|
||||
|
||||
|
@@ -31,7 +31,7 @@ import (
|
||||
type StatQueueProfile struct {
|
||||
Tenant string
|
||||
ID string // QueueID
|
||||
Filters []*RequestFilter
|
||||
FilterIDs []string
|
||||
ActivationInterval *utils.ActivationInterval // Activation interval
|
||||
QueueLength int
|
||||
TTL time.Duration
|
||||
|
||||
@@ -272,8 +272,8 @@ cgrates.org,ResGroup21,*rsr_fields,,HdrSubject(~^1.*1$);HdrDestination(1002),,,,
|
||||
cgrates.org,ResGroup22,*destinations,HdrDestination,DST_FS,2014-07-29T15:00:00Z,3600s,2,premium_call,true,true,10,
|
||||
`
|
||||
stats = `
|
||||
#Tenant[0],Id[1],FilterType[2],FilterFieldName[3],FilterFieldValues[4],ActivationInterval[5],QueueLength[6],TTL[7],Metrics[8],Blocker[9],Stored[10],Weight[11],MinItems[12],Thresholds[13]
|
||||
cgrates.org,Stats1,*string,Account,1001;1002,2014-07-29T15:00:00Z,100,1s,*asr;*acc;*tcc;*acd;*tcd;*pdd,true,true,20,2,THRESH1;THRESH2
|
||||
#Tenant[0],Id[1],FilterIDs[2],ActivationInterval[3],QueueLength[4],TTL[5],Metrics[6],Blocker[7],Stored[8],Weight[9],MinItems[10],Thresholds[11]
|
||||
cgrates.org,Stats1,FLTR_1,2014-07-29T15:00:00Z,100,1s,*asr;*acc;*tcc;*acd;*tcd;*pdd,true,true,20,2,THRESH1;THRESH2
|
||||
`
|
||||
|
||||
thresholds = `
|
||||
@@ -1449,12 +1449,10 @@ func TestLoadResourceProfiles(t *testing.T) {
|
||||
func TestLoadStatProfiles(t *testing.T) {
|
||||
eStats := map[utils.TenantID]*utils.TPStats{
|
||||
utils.TenantID{Tenant: "cgrates.org", ID: "Stats1"}: &utils.TPStats{
|
||||
Tenant: "cgrates.org",
|
||||
TPid: testTPID,
|
||||
ID: "Stats1",
|
||||
Filters: []*utils.TPRequestFilter{
|
||||
&utils.TPRequestFilter{Type: MetaString, FieldName: "Account", Values: []string{"1001", "1002"}},
|
||||
},
|
||||
Tenant: "cgrates.org",
|
||||
TPid: testTPID,
|
||||
ID: "Stats1",
|
||||
FilterIDs: []string{"FLTR_1"},
|
||||
ActivationInterval: &utils.TPActivationInterval{
|
||||
ActivationTime: "2014-07-29T15:00:00Z",
|
||||
},
|
||||
|
||||
@@ -2016,12 +2016,13 @@ func (tps TpStatsS) AsTPStats() (result []*utils.TPStats) {
|
||||
st.ActivationInterval.ActivationTime = aiSplt[0]
|
||||
}
|
||||
}
|
||||
if tp.FilterType != "" {
|
||||
st.Filters = append(st.Filters, &utils.TPRequestFilter{
|
||||
Type: tp.FilterType,
|
||||
FieldName: tp.FilterFieldName,
|
||||
Values: strings.Split(tp.FilterFieldValues, utils.INFIELD_SEP)})
|
||||
if tp.FilterIDs != "" {
|
||||
filterSplit := strings.Split(tp.FilterIDs, utils.INFIELD_SEP)
|
||||
for _, filter := range filterSplit {
|
||||
st.FilterIDs = append(st.FilterIDs, filter)
|
||||
}
|
||||
}
|
||||
|
||||
mst[tp.ID] = st
|
||||
}
|
||||
result = make([]*utils.TPStats, len(mst))
|
||||
@@ -2034,53 +2035,45 @@ func (tps TpStatsS) AsTPStats() (result []*utils.TPStats) {
|
||||
}
|
||||
|
||||
func APItoModelStats(st *utils.TPStats) (mdls TpStatsS) {
|
||||
if len(st.Filters) == 0 {
|
||||
return
|
||||
}
|
||||
for i, fltr := range st.Filters {
|
||||
mdl := &TpStats{
|
||||
Tenant: st.Tenant,
|
||||
Tpid: st.TPid,
|
||||
ID: st.ID,
|
||||
MinItems: st.MinItems,
|
||||
if st != nil {
|
||||
for i, fltr := range st.FilterIDs {
|
||||
mdl := &TpStats{
|
||||
Tenant: st.Tenant,
|
||||
Tpid: st.TPid,
|
||||
ID: st.ID,
|
||||
MinItems: st.MinItems,
|
||||
}
|
||||
if i == 0 {
|
||||
mdl.TTL = st.TTL
|
||||
mdl.Blocker = st.Blocker
|
||||
mdl.Stored = st.Stored
|
||||
mdl.Weight = st.Weight
|
||||
mdl.QueueLength = st.QueueLength
|
||||
mdl.MinItems = st.MinItems
|
||||
for i, val := range st.Metrics {
|
||||
if i != 0 {
|
||||
mdl.Metrics += utils.INFIELD_SEP
|
||||
}
|
||||
mdl.Metrics += val
|
||||
}
|
||||
for i, val := range st.Thresholds {
|
||||
if i != 0 {
|
||||
mdl.Thresholds += utils.INFIELD_SEP
|
||||
}
|
||||
mdl.Thresholds += val
|
||||
}
|
||||
if st.ActivationInterval != nil {
|
||||
if st.ActivationInterval.ActivationTime != "" {
|
||||
mdl.ActivationInterval = st.ActivationInterval.ActivationTime
|
||||
}
|
||||
if st.ActivationInterval.ExpiryTime != "" {
|
||||
mdl.ActivationInterval += utils.INFIELD_SEP + st.ActivationInterval.ExpiryTime
|
||||
}
|
||||
}
|
||||
}
|
||||
mdl.FilterIDs = fltr
|
||||
mdls = append(mdls, mdl)
|
||||
}
|
||||
if i == 0 {
|
||||
mdl.TTL = st.TTL
|
||||
mdl.Blocker = st.Blocker
|
||||
mdl.Stored = st.Stored
|
||||
mdl.Weight = st.Weight
|
||||
mdl.QueueLength = st.QueueLength
|
||||
mdl.MinItems = st.MinItems
|
||||
for i, val := range st.Metrics {
|
||||
if i != 0 {
|
||||
mdl.Metrics += utils.INFIELD_SEP
|
||||
}
|
||||
mdl.Metrics += val
|
||||
}
|
||||
for i, val := range st.Thresholds {
|
||||
if i != 0 {
|
||||
mdl.Thresholds += utils.INFIELD_SEP
|
||||
}
|
||||
mdl.Thresholds += val
|
||||
}
|
||||
if st.ActivationInterval != nil {
|
||||
if st.ActivationInterval.ActivationTime != "" {
|
||||
mdl.ActivationInterval = st.ActivationInterval.ActivationTime
|
||||
}
|
||||
if st.ActivationInterval.ExpiryTime != "" {
|
||||
mdl.ActivationInterval += utils.INFIELD_SEP + st.ActivationInterval.ExpiryTime
|
||||
}
|
||||
}
|
||||
}
|
||||
mdl.FilterType = fltr.Type
|
||||
mdl.FilterFieldName = fltr.FieldName
|
||||
for i, val := range fltr.Values {
|
||||
if i != 0 {
|
||||
mdl.FilterFieldValues += utils.INFIELD_SEP
|
||||
}
|
||||
mdl.FilterFieldValues += val
|
||||
}
|
||||
mdls = append(mdls, mdl)
|
||||
}
|
||||
return
|
||||
}
|
||||
@@ -2094,7 +2087,6 @@ func APItoStats(tpST *utils.TPStats, timezone string) (st *StatQueueProfile, err
|
||||
Blocker: tpST.Blocker,
|
||||
Stored: tpST.Stored,
|
||||
MinItems: tpST.MinItems,
|
||||
Filters: make([]*RequestFilter, len(tpST.Filters)),
|
||||
}
|
||||
if tpST.TTL != "" {
|
||||
if st.TTL, err = utils.ParseDurationWithSecs(tpST.TTL); err != nil {
|
||||
@@ -2108,12 +2100,8 @@ func APItoStats(tpST *utils.TPStats, timezone string) (st *StatQueueProfile, err
|
||||
st.Thresholds = append(st.Thresholds, trh)
|
||||
|
||||
}
|
||||
for i, f := range tpST.Filters {
|
||||
rf := &RequestFilter{Type: f.Type, FieldName: f.FieldName, Values: f.Values}
|
||||
if err := rf.CompileValues(); err != nil {
|
||||
return nil, err
|
||||
}
|
||||
st.Filters[i] = rf
|
||||
for _, fltr := range tpST.FilterIDs {
|
||||
st.FilterIDs = append(st.FilterIDs, fltr)
|
||||
}
|
||||
if tpST.ActivationInterval != nil {
|
||||
if st.ActivationInterval, err = tpST.ActivationInterval.AsActivationInterval(timezone); err != nil {
|
||||
|
||||
@@ -847,9 +847,7 @@ func TestTPStatsAsTPStats(t *testing.T) {
|
||||
&TpStats{
|
||||
Tpid: "TEST_TPID",
|
||||
ID: "Stats1",
|
||||
FilterType: MetaStringPrefix,
|
||||
FilterFieldName: "Account",
|
||||
FilterFieldValues: "1001;1002",
|
||||
FilterIDs: "FLTR_1",
|
||||
ActivationInterval: "2014-07-29T15:00:00Z",
|
||||
QueueLength: 100,
|
||||
TTL: "1s",
|
||||
@@ -863,15 +861,9 @@ func TestTPStatsAsTPStats(t *testing.T) {
|
||||
}
|
||||
eTPs := []*utils.TPStats{
|
||||
&utils.TPStats{
|
||||
TPid: tps[0].Tpid,
|
||||
ID: tps[0].ID,
|
||||
Filters: []*utils.TPRequestFilter{
|
||||
&utils.TPRequestFilter{
|
||||
Type: tps[0].FilterType,
|
||||
FieldName: tps[0].FilterFieldName,
|
||||
Values: []string{"1001", "1002"},
|
||||
},
|
||||
},
|
||||
TPid: tps[0].Tpid,
|
||||
ID: tps[0].ID,
|
||||
FilterIDs: []string{"FLTR_1"},
|
||||
ActivationInterval: &utils.TPActivationInterval{
|
||||
ActivationTime: tps[0].ActivationInterval,
|
||||
},
|
||||
@@ -893,11 +885,9 @@ func TestTPStatsAsTPStats(t *testing.T) {
|
||||
|
||||
func TestAPItoTPStats(t *testing.T) {
|
||||
tps := &utils.TPStats{
|
||||
TPid: testTPID,
|
||||
ID: "Stats1",
|
||||
Filters: []*utils.TPRequestFilter{
|
||||
&utils.TPRequestFilter{Type: MetaString, FieldName: "Account", Values: []string{"1001", "1002"}},
|
||||
},
|
||||
TPid: testTPID,
|
||||
ID: "Stats1",
|
||||
FilterIDs: []string{"FLTR_1"},
|
||||
ActivationInterval: &utils.TPActivationInterval{ActivationTime: "2014-07-29T15:00:00Z"},
|
||||
QueueLength: 100,
|
||||
TTL: "1s",
|
||||
@@ -913,7 +903,7 @@ func TestAPItoTPStats(t *testing.T) {
|
||||
QueueLength: tps.QueueLength,
|
||||
Metrics: []string{"*asr", "*acd", "*acc"},
|
||||
Thresholds: []string{"THRESH1", "THRESH2"},
|
||||
Filters: make([]*RequestFilter, len(tps.Filters)),
|
||||
FilterIDs: []string{"FLTR_1"},
|
||||
Stored: tps.Stored,
|
||||
Blocker: tps.Blocker,
|
||||
Weight: 20.0,
|
||||
@@ -922,9 +912,6 @@ func TestAPItoTPStats(t *testing.T) {
|
||||
if eTPs.TTL, err = utils.ParseDurationWithSecs(tps.TTL); err != nil {
|
||||
t.Errorf("Got error: %+v", err)
|
||||
}
|
||||
|
||||
eTPs.Filters[0] = &RequestFilter{Type: MetaString,
|
||||
FieldName: "Account", Values: []string{"1001", "1002"}}
|
||||
at, _ := utils.ParseTimeDetectLayout("2014-07-29T15:00:00Z", "UTC")
|
||||
eTPs.ActivationInterval = &utils.ActivationInterval{ActivationTime: at}
|
||||
|
||||
|
||||
@@ -485,18 +485,16 @@ type TpStats struct {
|
||||
Tpid string
|
||||
Tenant string `index:"0" re:""`
|
||||
ID string `index:"1" re:""`
|
||||
FilterType string `index:"2" re:"^\*[A-Za-z].*"`
|
||||
FilterFieldName string `index:"3" re:""`
|
||||
FilterFieldValues string `index:"4" re:""`
|
||||
ActivationInterval string `index:"5" re:""`
|
||||
QueueLength int `index:"6" re:""`
|
||||
TTL string `index:"7" re:""`
|
||||
Metrics string `index:"8" re:""`
|
||||
Blocker bool `index:"9" re:""`
|
||||
Stored bool `index:"10" re:""`
|
||||
Weight float64 `index:"11" re:"\d+\.?\d*"`
|
||||
MinItems int `index:"12" re:""`
|
||||
Thresholds string `index:"13" re:""`
|
||||
FilterIDs string `index:"2" re:""`
|
||||
ActivationInterval string `index:"3" re:""`
|
||||
QueueLength int `index:"4" re:""`
|
||||
TTL string `index:"5" re:""`
|
||||
Metrics string `index:"6" re:""`
|
||||
Blocker bool `index:"7" re:""`
|
||||
Stored bool `index:"8" re:""`
|
||||
Weight float64 `index:"9" re:"\d+\.?\d*"`
|
||||
MinItems int `index:"10" re:""`
|
||||
Thresholds string `index:"11" re:""`
|
||||
CreatedAt time.Time
|
||||
}
|
||||
|
||||
|
||||
@@ -1967,7 +1967,7 @@ func testOnStorITCRUDStatQueueProfile(t *testing.T) {
|
||||
sq := &StatQueueProfile{
|
||||
ID: "test",
|
||||
ActivationInterval: &utils.ActivationInterval{},
|
||||
Filters: []*RequestFilter{},
|
||||
FilterIDs: []string{},
|
||||
QueueLength: 2,
|
||||
TTL: timeTTL,
|
||||
Metrics: []string{},
|
||||
|
||||
@@ -34,7 +34,7 @@ import (
|
||||
|
||||
// NewStatService initializes a StatService
|
||||
func NewStatService(dm *DataManager, storeInterval time.Duration,
|
||||
thdS rpcclient.RpcClientConnection) (ss *StatService, err error) {
|
||||
thdS rpcclient.RpcClientConnection, filterS *FilterS) (ss *StatService, err error) {
|
||||
if thdS != nil && reflect.ValueOf(thdS).IsNil() { // fix nil value in interface
|
||||
thdS = nil
|
||||
}
|
||||
@@ -42,6 +42,7 @@ func NewStatService(dm *DataManager, storeInterval time.Duration,
|
||||
dm: dm,
|
||||
storeInterval: storeInterval,
|
||||
thdS: thdS,
|
||||
filterS: filterS,
|
||||
storedStatQueues: make(utils.StringMap),
|
||||
stopBackup: make(chan struct{})}, nil
|
||||
}
|
||||
@@ -51,6 +52,7 @@ type StatService struct {
|
||||
dm *DataManager
|
||||
storeInterval time.Duration
|
||||
thdS rpcclient.RpcClientConnection // rpc connection towards ThresholdS
|
||||
filterS *FilterS
|
||||
stopBackup chan struct{}
|
||||
storedStatQueues utils.StringMap // keep a record of stats which need saving, map[statsTenantID]bool
|
||||
ssqMux sync.RWMutex // protects storedStatQueues
|
||||
@@ -156,16 +158,9 @@ func (sS *StatService) matchingStatQueuesForEvent(ev *StatEvent) (sqs StatQueues
|
||||
!sqPrfl.ActivationInterval.IsActiveAtTime(time.Now()) { // not active
|
||||
continue
|
||||
}
|
||||
passAllFilters := true
|
||||
for _, fltr := range sqPrfl.Filters {
|
||||
if pass, err := fltr.Pass(ev.Event, "", sS); err != nil {
|
||||
return nil, err
|
||||
} else if !pass {
|
||||
passAllFilters = false
|
||||
continue
|
||||
}
|
||||
}
|
||||
if !passAllFilters {
|
||||
if pass, err := sS.filterS.PassFiltersForEvent(ev.Tenant, ev.Event, sqPrfl.FilterIDs); err != nil {
|
||||
return nil, err
|
||||
} else if !pass {
|
||||
continue
|
||||
}
|
||||
s, err := sS.dm.GetStatQueue(sqPrfl.Tenant, sqPrfl.ID, false, "")
|
||||
|
||||
@@ -1222,11 +1222,6 @@ func (ms *MapStorage) GetStatQueueProfileDrv(tenant string, id string) (sq *Stat
|
||||
if err != nil {
|
||||
return nil, err
|
||||
}
|
||||
for _, fltr := range sq.Filters {
|
||||
if err := fltr.CompileValues(); err != nil {
|
||||
return nil, err
|
||||
}
|
||||
}
|
||||
return
|
||||
}
|
||||
|
||||
|
||||
@@ -1824,11 +1824,6 @@ func (ms *MongoStorage) GetStatQueueProfileDrv(tenant string, id string) (sq *St
|
||||
}
|
||||
return nil, err
|
||||
}
|
||||
for _, fltr := range sq.Filters {
|
||||
if err = fltr.CompileValues(); err != nil {
|
||||
return
|
||||
}
|
||||
}
|
||||
return
|
||||
}
|
||||
|
||||
|
||||
@@ -1374,11 +1374,6 @@ func (rs *RedisStorage) GetStatQueueProfileDrv(tenant string, id string) (sq *St
|
||||
if err = rs.ms.Unmarshal(values, &sq); err != nil {
|
||||
return
|
||||
}
|
||||
for _, fltr := range sq.Filters {
|
||||
if err = fltr.CompileValues(); err != nil {
|
||||
return
|
||||
}
|
||||
}
|
||||
return
|
||||
}
|
||||
|
||||
|
||||
@@ -1560,16 +1560,10 @@ func testStorDBitCRUDTpStats(t *testing.T) {
|
||||
//WRITE
|
||||
eTPs := []*utils.TPStats{
|
||||
&utils.TPStats{
|
||||
TPid: "TEST_TPID",
|
||||
Tenant: "Test",
|
||||
ID: "Stats1",
|
||||
Filters: []*utils.TPRequestFilter{
|
||||
&utils.TPRequestFilter{
|
||||
Type: "filtertype",
|
||||
FieldName: "Account",
|
||||
Values: []string{"1001", "1002"},
|
||||
},
|
||||
},
|
||||
TPid: "TEST_TPID",
|
||||
Tenant: "Test",
|
||||
ID: "Stats1",
|
||||
FilterIDs: []string{"FLTR_1"},
|
||||
ActivationInterval: &utils.TPActivationInterval{
|
||||
ActivationTime: "2014-07-29T15:00:00Z",
|
||||
},
|
||||
@@ -1601,11 +1595,7 @@ func testStorDBitCRUDTpStats(t *testing.T) {
|
||||
if !(reflect.DeepEqual(eTPs[0].Weight, rcv[0].Weight) || reflect.DeepEqual(eTPs[0].Weight, rcv[1].Weight)) {
|
||||
t.Errorf("Expecting: %+v, received: %+v || %+v", eTPs[0].Weight, rcv[0].Weight, rcv[1].Weight)
|
||||
}
|
||||
for i, _ := range eTPs[0].Filters {
|
||||
if !(reflect.DeepEqual(eTPs[0].Filters[i], rcv[0].Filters[i]) || reflect.DeepEqual(eTPs[0].Filters[i], rcv[1].Filters[i])) {
|
||||
t.Errorf("Expecting: %+v, received: %+v || %+v", eTPs[0].Filters[i], rcv[0].Filters[i], rcv[1].Filters[i])
|
||||
}
|
||||
}
|
||||
|
||||
}
|
||||
// UPDATE
|
||||
eTPs[0].Weight = 2.1
|
||||
|
||||
@@ -64,6 +64,7 @@ type TpReader struct {
|
||||
revAliases,
|
||||
acntActionPlans map[string][]string
|
||||
thdsIndexers map[string]*ReqFilterIndexer // tenant, indexer
|
||||
sqpIndexers map[string]*ReqFilterIndexer // tenant, indexer
|
||||
}
|
||||
|
||||
func NewTpReader(db DataDB, lr LoadReader, tpid, timezone string) *TpReader {
|
||||
@@ -139,6 +140,7 @@ func (tpr *TpReader) Init() {
|
||||
tpr.revAliases = make(map[string][]string)
|
||||
tpr.acntActionPlans = make(map[string][]string)
|
||||
tpr.thdsIndexers = make(map[string]*ReqFilterIndexer)
|
||||
tpr.sqpIndexers = make(map[string]*ReqFilterIndexer)
|
||||
}
|
||||
|
||||
func (tpr *TpReader) LoadDestinationsFiltered(tag string) (bool, error) {
|
||||
@@ -1622,7 +1624,7 @@ func (tpr *TpReader) LoadResourceProfiles() error {
|
||||
return tpr.LoadResourceProfilesFiltered("")
|
||||
}
|
||||
|
||||
func (tpr *TpReader) LoadStatsFiltered(tag string) error {
|
||||
func (tpr *TpReader) LoadStatsFiltered(tag string) (err error) {
|
||||
tps, err := tpr.lr.GetTPStats(tpr.tpid, tag)
|
||||
if err != nil {
|
||||
return err
|
||||
@@ -1632,12 +1634,35 @@ func (tpr *TpReader) LoadStatsFiltered(tag string) error {
|
||||
mapSTs[utils.TenantID{Tenant: st.Tenant, ID: st.ID}] = st
|
||||
}
|
||||
tpr.sqProfiles = mapSTs
|
||||
for tenantid, _ := range mapSTs {
|
||||
for tenantid, sq := range mapSTs {
|
||||
sqpIndxrKey := utils.StatQueuesStringIndex + tenantid.TenantID()
|
||||
if has, err := tpr.dm.DataDB().HasData(utils.StatQueuePrefix, tenantid.TenantID()); err != nil {
|
||||
return err
|
||||
} else if !has {
|
||||
tpr.statQueues = append(tpr.statQueues, &utils.TenantID{Tenant: tenantid.Tenant, ID: tenantid.ID})
|
||||
}
|
||||
// index statQueues for filters
|
||||
if _, has := tpr.sqpIndexers[tenantid.TenantID()]; !has {
|
||||
if tpr.sqpIndexers[tenantid.TenantID()], err = NewReqFilterIndexer(tpr.dm, sqpIndxrKey); err != nil {
|
||||
return
|
||||
}
|
||||
}
|
||||
for _, fltrID := range sq.FilterIDs {
|
||||
tpFltr, has := tpr.filters[utils.TenantID{Tenant: tenantid.Tenant, ID: fltrID}]
|
||||
if !has {
|
||||
var fltr *Filter
|
||||
if fltr, err = tpr.dm.GetFilter(tenantid.Tenant, fltrID, false, utils.NonTransactional); err != nil {
|
||||
if err == utils.ErrNotFound {
|
||||
err = fmt.Errorf("broken reference to filter: %s for statQueue: %s", fltrID, sq)
|
||||
}
|
||||
return
|
||||
} else {
|
||||
tpFltr = FilterToTPFilter(fltr)
|
||||
}
|
||||
} else {
|
||||
tpr.sqpIndexers[tenantid.TenantID()].IndexTPFilter(tpFltr, sq.ID)
|
||||
}
|
||||
}
|
||||
}
|
||||
return nil
|
||||
}
|
||||
@@ -1653,7 +1678,7 @@ func (tpr *TpReader) LoadThresholdsFiltered(tag string) (err error) {
|
||||
}
|
||||
mapTHs := make(map[utils.TenantID]*utils.TPThreshold)
|
||||
for _, th := range tps {
|
||||
mapTHs[utils.TenantID{th.Tenant, th.ID}] = th
|
||||
mapTHs[utils.TenantID{Tenant: th.Tenant, ID: th.ID}] = th
|
||||
}
|
||||
tpr.thProfiles = mapTHs
|
||||
for tntID, th := range mapTHs {
|
||||
@@ -1700,7 +1725,7 @@ func (tpr *TpReader) LoadFiltersFiltered(tag string) error {
|
||||
}
|
||||
mapTHs := make(map[utils.TenantID]*utils.TPFilter)
|
||||
for _, th := range tps {
|
||||
mapTHs[utils.TenantID{th.Tenant, th.ID}] = th
|
||||
mapTHs[utils.TenantID{Tenant: th.Tenant, ID: th.ID}] = th
|
||||
}
|
||||
tpr.filters = mapTHs
|
||||
return nil
|
||||
@@ -2163,28 +2188,19 @@ func (tpr *TpReader) WriteToDatabase(flush, verbose, disable_reverse bool) (err
|
||||
}
|
||||
}
|
||||
}
|
||||
if len(tpr.sqProfiles) > 0 {
|
||||
if verbose {
|
||||
log.Print("Indexing stats")
|
||||
|
||||
if verbose {
|
||||
log.Print("StatQueue filter indexes:")
|
||||
}
|
||||
for tenant, fltrIdxer := range tpr.sqpIndexers {
|
||||
if err := fltrIdxer.StoreIndexes(); err != nil {
|
||||
return err
|
||||
}
|
||||
for tenantid, st := range tpr.sqProfiles {
|
||||
stIdxr, err := NewReqFilterIndexer(tpr.dm, utils.StatQueuesStringIndex+tenantid.Tenant)
|
||||
if err != nil {
|
||||
return err
|
||||
}
|
||||
if st, err := APItoStats(st, tpr.timezone); err != nil {
|
||||
return err
|
||||
} else {
|
||||
stIdxr.IndexFilters(st.ID, st.Filters)
|
||||
}
|
||||
if verbose {
|
||||
log.Printf("Indexed Stats tenant: %s, keys %+v", tenantid.Tenant, stIdxr.ChangedKeys().Slice())
|
||||
}
|
||||
if err := stIdxr.StoreIndexes(); err != nil {
|
||||
return err
|
||||
}
|
||||
if verbose {
|
||||
log.Printf("Tenant: %s, keys %+v", tenant, fltrIdxer.ChangedKeys().Slice())
|
||||
}
|
||||
}
|
||||
|
||||
if verbose {
|
||||
log.Print("Threshold filter indexes:")
|
||||
}
|
||||
|
||||
@@ -1343,7 +1343,7 @@ type TPStats struct {
|
||||
TPid string
|
||||
Tenant string
|
||||
ID string
|
||||
Filters []*TPRequestFilter
|
||||
FilterIDs []string
|
||||
ActivationInterval *TPActivationInterval
|
||||
QueueLength int
|
||||
TTL string
|
||||
|
||||
Reference in New Issue
Block a user