added redis cache

This commit is contained in:
Edwardro22
2017-07-26 16:49:22 +03:00
parent fbacb250b6
commit 8f7bd92915
11 changed files with 167 additions and 85 deletions

0
cmd/cgr-loader/cgr-loader.go Normal file → Executable file
View File

11
engine/loader_csv_test.go Normal file → Executable file
View File

@@ -1433,17 +1433,16 @@ func TestLoadStats(t *testing.T) {
},
QueueLength: 100,
TTL: "1s",
Metrics: "*asr;*acd;*acc",
Metrics: []string{"*asr", "*acd", "*acc"},
Store: true,
Thresholds: "THRESH1;THRESH2",
Thresholds: []string{"THRESH1", "THRESH2"},
Weight: 20,
},
}
// if len(csvr.stats) != len(eStats) {
// t.Error("Failed to load stats: ", len(csvr.stats))
// } else
if !reflect.DeepEqual(eStats["Stats1"], csvr.stats["Stats1"]) {
if len(csvr.stats) != len(eStats) {
t.Error("Failed to load stats: ", len(csvr.stats))
} else if !reflect.DeepEqual(eStats["Stats1"], csvr.stats["Stats1"]) {
t.Errorf("Expecting: %+v, received: %+v", eStats["Stats1"], csvr.stats["Stats1"])
}
}

115
engine/model_helpers.go Normal file → Executable file
View File

@@ -1956,13 +1956,19 @@ func (tps TpStatsS) AsTPStats() (result []*utils.TPStats) {
rl.TTL = tp.TTL
}
if tp.Metrics != "" {
rl.Metrics = tp.Metrics
metrSplt := strings.Split(tp.Metrics, utils.INFIELD_SEP)
for _, metr := range metrSplt {
rl.Metrics = append(rl.Metrics, metr)
}
}
if tp.Store != false {
rl.Store = tp.Store
}
if tp.Thresholds != "" {
rl.Thresholds = tp.Thresholds
trshSplt := strings.Split(tp.Thresholds, utils.INFIELD_SEP)
for _, trsh := range trshSplt {
rl.Thresholds = append(rl.Thresholds, trsh)
}
}
if tp.Weight != 0 {
rl.Weight = tp.Weight
@@ -1994,77 +2000,36 @@ func (tps TpStatsS) AsTPStats() (result []*utils.TPStats) {
return
}
// func APItoModelTPStats(tps *utils.TPStats) (mdls TpStatsS) {
// if len(rl.Filters) == 0 {
// return
// }
// for i, fltr := range rl.Filters {
// mdl := &TpStats{
// Tpid: tps.TPid,
// Tag: tps.ID,
// }
// if i == 0 {
// mdl.QueueLength = tps.QueueLength
// mdl.TTL = tps.TTL
// mdl.Metrics = tps.Metrics
// mdl.Store = tps.Store
// mdl.Thresholds = tps.Thresholds
// mdl.Weight = tps.Weight
// if tps.ActivationInterval != nil {
// if tps.ActivationInterval.ActivationTime != "" {
// mdl.ActivationInterval = tps.ActivationInterval.ActivationTime
// }
// if tps.ActivationInterval.ExpiryTime != "" {
// mdl.ActivationInterval += utils.INFIELD_SEP + tps.ActivationInterval.ExpiryTime
// }
// }
// }
// mdl.FilterType = fltr.Type
// mdl.FilterFieldName = fltr.FieldName
// for i, val := range fltr.Values {
// if i != 0 {
// mdl.FilterFieldValues = mdl.FilterFieldValues + utils.INFIELD_SEP + val
// } else {
// mdl.FilterFieldValues = val
// }
// }
// mdls = append(mdls, mdl)
// }
// return
// }FilterFieldValues = mdl.FilterFieldValues + utils.INFIELD_SEP + val
// } else {
// mdl.FilterFieldValues = val
// }
// }
// mdls = append(mdls, mdl)
// }
// return
// }
func APItoTPStats(tpST *utils.TPStats, timezone string) (st *StatsQueue, err error) {
st = &StatsQueue{
ID: tpST.ID,
QueueLength: tpST.QueueLength,
Store: tpST.Store,
Filters: make([]*RequestFilter, len(tpST.Filters)),
}
if tpST.TTL != "" {
if st.TTL, err = utils.ParseDurationWithSecs(tpST.TTL); err != nil {
return nil, err
}
}
for _, metr := range tpST.Metrics {
st.Metrics = append(st.Metrics, metr)
}
for _, trh := range tpST.Thresholds {
st.Thresholds = append(st.Thresholds, trh)
// func APItoTPStats(tpST *utils.TPStats, timezone string) (st *TpStats, err error) {
// st = &ResourceLimit{ID: tpST.ID, Weight: tpST.Weight,
// Filters: make([]*RequestFilter, len(tpST.Filters)), Usage: make(map[string]*ResourceUsage)}
// if tpST.TTL != "" {
// if rl.TTL, err = utils.ParseDurationWithSecs(tpST.TTL); err != nil {
// return nil, err
// }
// }
// for i, f := range tpST.Filters {
// rf := &RequestFilter{Type: f.Type, FieldName: f.FieldName, Values: f.Values}
// if err := rf.CompileValues(); err != nil {
// return nil, err
// }
// st.Filters[i] = rf
// }
// if tpST.ActivationInterval != nil {
// if st.ActivationInterval, err = tpST.ActivationInterval.AsActivationInterval(timezone); err != nil {
// return nil, err
// }
// }
// if tpST.Thresholds != "" {
// if st.Thresholds, err = strconv.ParseFloat(tpST.Thresholds, 64); err != nil {
// return nil, err
// }
// }
// return st, nil
// }
}
for i, f := range tpST.Filters {
rf := &RequestFilter{Type: f.Type, FieldName: f.FieldName, Values: f.Values}
if err := rf.CompileValues(); err != nil {
return nil, err
}
st.Filters[i] = rf
}
if tpST.ActivationInterval != nil {
if st.ActivationInterval, err = tpST.ActivationInterval.AsActivationInterval(timezone); err != nil {
return nil, err
}
}
return st, nil
}

View File

@@ -782,3 +782,87 @@ func TestAPItoResourceLimit(t *testing.T) {
t.Errorf("Expecting: %+v, received: %+v", eRL, rl)
}
}
func TestTPStatsAsTPStats(t *testing.T) {
tps := []*TpStats{
&TpStats{
Tpid: "TEST_TPID",
Tag: "Stats1",
FilterType: MetaStringPrefix,
FilterFieldName: "Account",
FilterFieldValues: "1001;1002",
ActivationInterval: "2014-07-29T15:00:00Z",
QueueLength: 100,
TTL: "1s",
Metrics: "*asr;*acd;*acc",
Store: true,
Thresholds: "THRESH1;THRESH2",
Weight: 20.0,
},
}
eTPs := []*utils.TPStats{
&utils.TPStats{
TPid: tps[0].Tpid,
ID: tps[0].Tag,
Filters: []*utils.TPRequestFilter{
&utils.TPRequestFilter{
Type: tps[0].FilterType,
FieldName: tps[0].FilterFieldName,
Values: []string{"1001", "1002"},
},
},
ActivationInterval: &utils.TPActivationInterval{
ActivationTime: tps[0].ActivationInterval,
},
QueueLength: tps[0].QueueLength,
TTL: tps[0].TTL,
Metrics: []string{"*asr", "*acd", "*acc"},
Store: tps[0].Store,
Thresholds: []string{"THRESH1", "THRESH2"},
Weight: tps[0].Weight,
},
}
rcvTPs := TpStatsS(tps).AsTPStats()
if !(reflect.DeepEqual(eTPs, rcvTPs) || reflect.DeepEqual(eTPs[0], rcvTPs[0])) {
t.Errorf("\nExpecting:\n%+v\nReceived:\n%+v", utils.ToIJSON(eTPs), utils.ToIJSON(rcvTPs))
}
}
func TestAPItoTPStats(t *testing.T) {
tps := &utils.TPStats{
TPid: testTPID,
ID: "Stats1",
Filters: []*utils.TPRequestFilter{
&utils.TPRequestFilter{Type: MetaString, FieldName: "Account", Values: []string{"1001", "1002"}},
},
ActivationInterval: &utils.TPActivationInterval{ActivationTime: "2014-07-29T15:00:00Z"},
QueueLength: 100,
TTL: "1s",
Metrics: []string{"*asr", "*acd", "*acc"},
Store: true,
Thresholds: []string{"THRESH1", "THRESH2"},
Weight: 20.0,
}
eTPs := &StatsQueue{ID: tps.ID,
QueueLength: tps.QueueLength,
Metrics: []string{"*asr", "*acd", "*acc"},
Store: tps.Store,
Thresholds: []string{"THRESH1", "THRESH2"},
Filters: make([]*RequestFilter, len(tps.Filters)),
}
if eTPs.TTL, err = utils.ParseDurationWithSecs(tps.TTL); err != nil {
t.Errorf("Got error: %+v", err)
}
eTPs.Filters[0] = &RequestFilter{Type: MetaString,
FieldName: "Account", Values: []string{"1001", "1002"}}
at, _ := utils.ParseTimeDetectLayout("2014-07-29T15:00:00Z", "UTC")
eTPs.ActivationInterval = &utils.ActivationInterval{ActivationTime: at}
if st, err := APItoTPStats(tps, "UTC"); err != nil {
t.Error(err)
} else if !reflect.DeepEqual(eTPs, st) {
t.Errorf("Expecting: %+v, received: %+v", eTPs, st)
}
}

0
engine/models.go Normal file → Executable file
View File

View File

@@ -52,7 +52,7 @@ type StatsQueue struct {
ActivationInterval *utils.ActivationInterval // Activation interval
Filters []*RequestFilter
QueueLength int
TTL *time.Duration
TTL time.Duration
Metrics []string // list of metrics to build
Store bool // store to DB
Thresholds []string // list of thresholds to be checked after changes

0
engine/storage_csv.go Normal file → Executable file
View File

View File

@@ -1982,6 +1982,28 @@ func (tpr *TpReader) WriteToDatabase(flush, verbose, disable_reverse bool) (err
return err
}
}
if len(tpr.stats) > 0 {
if verbose {
log.Print("Indexing stats")
}
stIdxr, err := NewReqFilterIndexer(tpr.dataStorage, utils.StatsIndex)
if err != nil {
return err
}
for _, tpST := range tpr.stats {
if st, err := APItoTPStats(tpST, tpr.timezone); err != nil {
return err
} else {
stIdxr.IndexFilters(st.ID, st.Filters)
}
}
if verbose {
log.Printf("Indexed Stats keys: %+v", stIdxr.ChangedKeys().Slice())
}
if err := stIdxr.StoreIndexes(); err != nil {
return err
}
}
}
return
}
@@ -2045,6 +2067,8 @@ func (tpr *TpReader) ShowStatistics() {
log.Print("CDR stats: ", len(tpr.cdrStats))
// resource limits
log.Print("ResourceLimits: ", len(tpr.resLimits))
// stats
log.Print("Stats: ", len(tpr.stats))
}
// Returns the identities loaded for a specific category, useful for cache reloads
@@ -2178,6 +2202,14 @@ func (tpr *TpReader) GetLoadedIds(categ string) ([]string, error) {
i++
}
return keys, nil
case utils.StatsPrefix:
keys := make([]string, len(tpr.stats))
i := 0
for k := range tpr.stats {
keys[i] = k
i++
}
return keys, nil
}
return nil, errors.New("Unsupported load category")
}

0
engine/tpimporter_csv.go Normal file → Executable file
View File

4
utils/apitpdata.go Normal file → Executable file
View File

@@ -1331,8 +1331,8 @@ type TPStats struct {
ActivationInterval *TPActivationInterval
QueueLength int
TTL string
Metrics string
Metrics []string
Store bool
Thresholds string
Thresholds []string
Weight float64
}

View File

@@ -101,7 +101,7 @@ const (
USERS_CSV = "Users.csv"
ALIASES_CSV = "Aliases.csv"
ResourceLimitsCsv = "ResourceLimits.csv"
StatsCsv = "Stats.csv"
StatsCsv = "Stats.csv"
ROUNDING_UP = "*up"
ROUNDING_MIDDLE = "*middle"
ROUNDING_DOWN = "*down"
@@ -214,6 +214,8 @@ const (
REVERSE_ALIASES_PREFIX = "rls_"
ResourceLimitsPrefix = "rlm_"
ResourceLimitsIndex = "rli_"
StatsPrefix = "sts_"
StatsIndex = "sti_"
TimingsPrefix = "tmg_"
CDR_STATS_PREFIX = "cst_"
TEMP_DESTINATION_PREFIX = "tmp_"