Added compress for statsQueue

This commit is contained in:
Tripon Alexandru-Ionut
2019-04-04 11:06:16 +03:00
committed by Dan Christian Bogos
parent 33a243d3f4
commit bae3ece551
7 changed files with 365 additions and 10 deletions

View File

@@ -484,6 +484,7 @@ const CGRATES_CFG_JSON = `
"stats": { // StatS config
"enabled": false, // starts Stat service: <true|false>.
"store_interval": "", // dump cache regularly to dataDB, 0 - dump at start/shutdown: <""|$dur>
"max_queue_lenght": 0, // used to compress data
"thresholds_conns": [], // connections to ThresholdS for StatUpdates, empty to disable thresholds functionality: <""|*internal|x.y.z.y:1234>
"indexed_selects":true, // enable profile matching exclusively on indexes
//"string_indexed_fields": [], // query indexes based on these fields for faster processing

View File

@@ -800,6 +800,7 @@ func TestDfStatServiceJsonCfg(t *testing.T) {
Enabled: utils.BoolPointer(false),
Indexed_selects: utils.BoolPointer(true),
Store_interval: utils.StringPointer(""),
Max_queue_lenght: utils.IntPointer(0),
Thresholds_conns: &[]*HaPoolJsonCfg{},
String_indexed_fields: nil,
Prefix_indexed_fields: &[]string{},

View File

@@ -437,6 +437,7 @@ type StatServJsonCfg struct {
Enabled *bool
Indexed_selects *bool
Store_interval *string
Max_queue_lenght *int
Thresholds_conns *[]*HaPoolJsonCfg
String_indexed_fields *[]string
Prefix_indexed_fields *[]string

View File

@@ -28,6 +28,7 @@ type StatSCfg struct {
Enabled bool
IndexedSelects bool
StoreInterval time.Duration // Dump regularly from cache into dataDB
MaxQueueLenght int
ThresholdSConns []*RemoteHost
StringIndexedFields *[]string
PrefixIndexedFields *[]string
@@ -48,6 +49,9 @@ func (st *StatSCfg) loadFromJsonCfg(jsnCfg *StatServJsonCfg) (err error) {
return err
}
}
if jsnCfg.Max_queue_lenght != nil {
st.MaxQueueLenght = *jsnCfg.Max_queue_lenght
}
if jsnCfg.Thresholds_conns != nil {
st.ThresholdSConns = make([]*RemoteHost, len(*jsnCfg.Thresholds_conns))
for idx, jsnHaCfg := range *jsnCfg.Thresholds_conns {

View File

@@ -458,6 +458,7 @@
// "stats": { // StatS config
// "enabled": false, // starts Stat service: <true|false>.
// "store_interval": "", // dump cache regularly to dataDB, 0 - dump at start/shutdown: <""|$dur>
// "max_queue_lenght": 0, // used to compress data
// "thresholds_conns": [], // connections to ThresholdS for StatUpdates, empty to disable thresholds functionality: <""|*internal|x.y.z.y:1234>
// "indexed_selects":true, // enable profile matching exclusively on indexes
// //"string_indexed_fields": [], // query indexes based on these fields for faster processing

View File

@@ -56,11 +56,12 @@ type MetricWithFilters struct {
// NewStoredStatQueue initiates a StoredStatQueue out of StatQueue
func NewStoredStatQueue(sq *StatQueue, ms Marshaler) (sSQ *StoredStatQueue, err error) {
sSQ = &StoredStatQueue{
Tenant: sq.Tenant,
ID: sq.ID,
SQItems: make([]SQItem, len(sq.SQItems)),
SQMetrics: make(map[string][]byte, len(sq.SQMetrics)),
MinItems: sq.MinItems,
Tenant: sq.Tenant,
ID: sq.ID,
Compressed: sq.Compress(int64(config.CgrConfig().StatSCfg().MaxQueueLenght)),
SQItems: make([]SQItem, len(sq.SQItems)),
SQMetrics: make(map[string][]byte, len(sq.SQMetrics)),
MinItems: sq.MinItems,
}
for i, sqItm := range sq.SQItems {
sSQ.SQItems[i] = sqItm
@@ -77,11 +78,12 @@ func NewStoredStatQueue(sq *StatQueue, ms Marshaler) (sSQ *StoredStatQueue, err
// StoredStatQueue differs from StatQueue due to serialization of SQMetrics
type StoredStatQueue struct {
Tenant string
ID string
SQItems []SQItem
SQMetrics map[string][]byte
MinItems int
Tenant string
ID string
SQItems []SQItem
SQMetrics map[string][]byte
MinItems int
Compressed bool
}
// SqID will compose the unique identifier for the StatQueue out of Tenant and ID
@@ -110,6 +112,9 @@ func (ssq *StoredStatQueue) AsStatQueue(ms Marshaler) (sq *StatQueue, err error)
sq.SQMetrics[metricID] = metric
}
}
if ssq.Compressed {
sq.Expand()
}
return
}
@@ -223,6 +228,69 @@ func (sq *StatQueue) addStatEvent(ev *utils.CGREvent, filterS *FilterS) (err err
return
}
func (sq *StatQueue) Compress(maxQL int64) bool {
if int64(len(sq.SQItems)) < maxQL || maxQL == 0 {
return false
}
var newSQItems []SQItem
SQMap := make(map[string]*time.Time)
idMap := make(map[string]struct{})
defaultCompressID := utils.UUIDSha1Prefix()
defaultTTL := sq.SQItems[len(sq.SQItems)-1].ExpiryTime
SQMap[defaultCompressID] = defaultTTL
for _, sqitem := range sq.SQItems {
SQMap[sqitem.EventID] = sqitem.ExpiryTime
}
for _, m := range sq.SQMetrics {
for _, id := range m.Compress(maxQL, defaultCompressID) {
idMap[id] = struct{}{}
}
}
for k, _ := range idMap {
ttl, has := SQMap[k]
if !has { // log warning
ttl = defaultTTL
}
newSQItems = append(newSQItems, SQItem{
EventID: k,
ExpiryTime: ttl,
})
}
if sq.ttl != nil {
sort.Slice(newSQItems, func(i, j int) bool {
if newSQItems[i].ExpiryTime == nil {
return false
}
if newSQItems[j].ExpiryTime == nil {
return true
}
return newSQItems[i].ExpiryTime.Before(*(newSQItems[j].ExpiryTime))
})
}
sq.SQItems = newSQItems
return true
}
func (sq *StatQueue) Expand() {
compressFactorMap := make(map[string]int)
for _, m := range sq.SQMetrics {
compressFactorMap = m.GetCompressFactor(compressFactorMap)
}
var newSQItems []SQItem
for _, sqi := range sq.SQItems {
cf, has := compressFactorMap[sqi.EventID]
if !has {
continue
}
for i := 0; i < cf; i++ {
newSQItems = append(newSQItems, sqi)
}
}
sq.SQItems = newSQItems
}
// StatQueues is a sortable list of StatQueue
type StatQueues []*StatQueue

View File

@@ -80,6 +80,8 @@ type StatMetric interface {
Marshal(ms Marshaler) (marshaled []byte, err error)
LoadMarshaled(ms Marshaler, marshaled []byte) (err error)
GetFilterIDs() (filterIDs []string)
Compress(queueLen int64, defaultID string) (eventIDs []string)
GetCompressFactor(events map[string]int) map[string]int
}
func NewASR(minItems int, extraParams string, filterIDs []string) (StatMetric, error) {
@@ -190,6 +192,36 @@ func (asr *StatASR) GetFilterIDs() []string {
return asr.FilterIDs
}
// Compress is part of StatMetric interface
func (asr *StatASR) Compress(queueLen int64, defaultID string) (eventIDs []string) {
if asr.Count < queueLen {
for id, _ := range asr.Events {
eventIDs = append(eventIDs, id)
}
return
}
stat := &StatWithCompress{
Stat: utils.Round(asr.Answered/float64(asr.Count),
config.CgrConfig().GeneralCfg().RoundingDecimals, utils.ROUNDING_MIDDLE),
CompressFactor: int(asr.Count),
}
asr.Events = map[string]*StatWithCompress{defaultID: stat}
return []string{defaultID}
}
// Compress is part of StatMetric interface
func (asr *StatASR) GetCompressFactor(events map[string]int) map[string]int {
for id, val := range asr.Events {
if _, has := events[id]; !has {
events[id] = val.CompressFactor
}
if events[id] < val.CompressFactor {
events[id] = val.CompressFactor
}
}
return events
}
func NewACD(minItems int, extraParams string, filterIDs []string) (StatMetric, error) {
return &StatACD{Events: make(map[string]*DurationWithCompress), MinItems: minItems, FilterIDs: filterIDs}, nil
}
@@ -288,6 +320,35 @@ func (acd *StatACD) GetFilterIDs() []string {
return acd.FilterIDs
}
// Compress is part of StatMetric interface
func (acd *StatACD) Compress(queueLen int64, defaultID string) (eventIDs []string) {
if acd.Count < queueLen {
for id, _ := range acd.Events {
eventIDs = append(eventIDs, id)
}
return
}
stat := &DurationWithCompress{
Duration: time.Duration(acd.Sum.Nanoseconds() / acd.Count),
CompressFactor: int(acd.Count),
}
acd.Events = map[string]*DurationWithCompress{defaultID: stat}
return []string{defaultID}
}
// Compress is part of StatMetric interface
func (acd *StatACD) GetCompressFactor(events map[string]int) map[string]int {
for id, val := range acd.Events {
if _, has := events[id]; !has {
events[id] = val.CompressFactor
}
if events[id] < val.CompressFactor {
events[id] = val.CompressFactor
}
}
return events
}
func NewTCD(minItems int, extraParams string, filterIDs []string) (StatMetric, error) {
return &StatTCD{Events: make(map[string]*DurationWithCompress), MinItems: minItems, FilterIDs: filterIDs}, nil
}
@@ -387,6 +448,35 @@ func (tcd *StatTCD) GetFilterIDs() []string {
return tcd.FilterIDs
}
// Compress is part of StatMetric interface
func (tcd *StatTCD) Compress(queueLen int64, defaultID string) (eventIDs []string) {
if tcd.Count < queueLen {
for id, _ := range tcd.Events {
eventIDs = append(eventIDs, id)
}
return
}
stat := &DurationWithCompress{
Duration: time.Duration(tcd.Sum.Nanoseconds() / tcd.Count),
CompressFactor: int(tcd.Count),
}
tcd.Events = map[string]*DurationWithCompress{defaultID: stat}
return []string{defaultID}
}
// Compress is part of StatMetric interface
func (tcd *StatTCD) GetCompressFactor(events map[string]int) map[string]int {
for id, val := range tcd.Events {
if _, has := events[id]; !has {
events[id] = val.CompressFactor
}
if events[id] < val.CompressFactor {
events[id] = val.CompressFactor
}
}
return events
}
func NewACC(minItems int, extraParams string, filterIDs []string) (StatMetric, error) {
return &StatACC{Events: make(map[string]*StatWithCompress), MinItems: minItems, FilterIDs: filterIDs}, nil
}
@@ -481,6 +571,36 @@ func (acc *StatACC) GetFilterIDs() []string {
return acc.FilterIDs
}
// Compress is part of StatMetric interface
func (acc *StatACC) Compress(queueLen int64, defaultID string) (eventIDs []string) {
if acc.Count < queueLen {
for id, _ := range acc.Events {
eventIDs = append(eventIDs, id)
}
return
}
stat := &StatWithCompress{
Stat: utils.Round((acc.Sum / float64(acc.Count)),
config.CgrConfig().GeneralCfg().RoundingDecimals, utils.ROUNDING_MIDDLE),
CompressFactor: int(acc.Count),
}
acc.Events = map[string]*StatWithCompress{defaultID: stat}
return []string{defaultID}
}
// Compress is part of StatMetric interface
func (acc *StatACC) GetCompressFactor(events map[string]int) map[string]int {
for id, val := range acc.Events {
if _, has := events[id]; !has {
events[id] = val.CompressFactor
}
if events[id] < val.CompressFactor {
events[id] = val.CompressFactor
}
}
return events
}
func NewTCC(minItems int, extraParams string, filterIDs []string) (StatMetric, error) {
return &StatTCC{Events: make(map[string]*StatWithCompress), MinItems: minItems, FilterIDs: filterIDs}, nil
}
@@ -577,6 +697,36 @@ func (tcc *StatTCC) GetFilterIDs() []string {
return tcc.FilterIDs
}
// Compress is part of StatMetric interface
func (tcc *StatTCC) Compress(queueLen int64, defaultID string) (eventIDs []string) {
if tcc.Count < queueLen {
for id, _ := range tcc.Events {
eventIDs = append(eventIDs, id)
}
return
}
stat := &StatWithCompress{
Stat: utils.Round((tcc.Sum / float64(tcc.Count)),
config.CgrConfig().GeneralCfg().RoundingDecimals, utils.ROUNDING_MIDDLE),
CompressFactor: int(tcc.Count),
}
tcc.Events = map[string]*StatWithCompress{defaultID: stat}
return []string{defaultID}
}
// Compress is part of StatMetric interface
func (tcc *StatTCC) GetCompressFactor(events map[string]int) map[string]int {
for id, val := range tcc.Events {
if _, has := events[id]; !has {
events[id] = val.CompressFactor
}
if events[id] < val.CompressFactor {
events[id] = val.CompressFactor
}
}
return events
}
func NewPDD(minItems int, extraParams string, filterIDs []string) (StatMetric, error) {
return &StatPDD{Events: make(map[string]*DurationWithCompress), MinItems: minItems, FilterIDs: filterIDs}, nil
}
@@ -675,6 +825,35 @@ func (pdd *StatPDD) GetFilterIDs() []string {
return pdd.FilterIDs
}
// Compress is part of StatMetric interface
func (pdd *StatPDD) Compress(queueLen int64, defaultID string) (eventIDs []string) {
if pdd.Count < queueLen {
for id, _ := range pdd.Events {
eventIDs = append(eventIDs, id)
}
return
}
stat := &DurationWithCompress{
Duration: time.Duration(pdd.Sum.Nanoseconds() / pdd.Count),
CompressFactor: int(pdd.Count),
}
pdd.Events = map[string]*DurationWithCompress{defaultID: stat}
return []string{defaultID}
}
// Compress is part of StatMetric interface
func (pdd *StatPDD) GetCompressFactor(events map[string]int) map[string]int {
for id, val := range pdd.Events {
if _, has := events[id]; !has {
events[id] = val.CompressFactor
}
if events[id] < val.CompressFactor {
events[id] = val.CompressFactor
}
}
return events
}
func NewDDC(minItems int, extraParams string, filterIDs []string) (StatMetric, error) {
return &StatDDC{Destinations: make(map[string]utils.StringMap),
Events: make(map[string]string), MinItems: minItems, FilterIDs: filterIDs}, nil
@@ -750,6 +929,26 @@ func (ddc *StatDDC) GetFilterIDs() []string {
return ddc.FilterIDs
}
func (ddc *StatDDC) Compress(queueLen int64, defaultID string) (eventIDs []string) {
for id, _ := range ddc.Events {
eventIDs = append(eventIDs, id)
}
return
}
// Compress is part of StatMetric interface
func (ddc *StatDDC) GetCompressFactor(events map[string]int) map[string]int {
for id, _ := range ddc.Events {
if _, has := events[id]; !has {
events[id] = 1
}
if events[id] < 1 {
events[id] = 1
}
}
return events
}
func NewStatSum(minItems int, extraParams string, filterIDs []string) (StatMetric, error) {
return &StatSum{Events: make(map[string]*StatWithCompress),
MinItems: minItems, FieldName: extraParams, FilterIDs: filterIDs}, nil
@@ -847,6 +1046,36 @@ func (sum *StatSum) GetFilterIDs() []string {
return sum.FilterIDs
}
// Compress is part of StatMetric interface
func (sum *StatSum) Compress(queueLen int64, defaultID string) (eventIDs []string) {
if sum.Count < queueLen {
for id, _ := range sum.Events {
eventIDs = append(eventIDs, id)
}
return
}
stat := &StatWithCompress{
Stat: utils.Round((sum.Sum / float64(sum.Count)),
config.CgrConfig().GeneralCfg().RoundingDecimals, utils.ROUNDING_MIDDLE),
CompressFactor: int(sum.Count),
}
sum.Events = map[string]*StatWithCompress{defaultID: stat}
return []string{defaultID}
}
// Compress is part of StatMetric interface
func (sum *StatSum) GetCompressFactor(events map[string]int) map[string]int {
for id, val := range sum.Events {
if _, has := events[id]; !has {
events[id] = val.CompressFactor
}
if events[id] < val.CompressFactor {
events[id] = val.CompressFactor
}
}
return events
}
func NewStatAverage(minItems int, extraParams string, filterIDs []string) (StatMetric, error) {
return &StatAverage{Events: make(map[string]*StatWithCompress),
MinItems: minItems, FieldName: extraParams, FilterIDs: filterIDs}, nil
@@ -945,6 +1174,36 @@ func (avg *StatAverage) GetFilterIDs() []string {
return avg.FilterIDs
}
// Compress is part of StatMetric interface
func (avg *StatAverage) Compress(queueLen int64, defaultID string) (eventIDs []string) {
if avg.Count < queueLen {
for id, _ := range avg.Events {
eventIDs = append(eventIDs, id)
}
return
}
stat := &StatWithCompress{
Stat: utils.Round((avg.Sum / float64(avg.Count)),
config.CgrConfig().GeneralCfg().RoundingDecimals, utils.ROUNDING_MIDDLE),
CompressFactor: int(avg.Count),
}
avg.Events = map[string]*StatWithCompress{defaultID: stat}
return []string{defaultID}
}
// Compress is part of StatMetric interface
func (avg *StatAverage) GetCompressFactor(events map[string]int) map[string]int {
for id, val := range avg.Events {
if _, has := events[id]; !has {
events[id] = val.CompressFactor
}
if events[id] < val.CompressFactor {
events[id] = val.CompressFactor
}
}
return events
}
func NewStatDistinct(minItems int, extraParams string, filterIDs []string) (StatMetric, error) {
return &StatDistinct{Events: make(map[string]struct{}),
MinItems: minItems, FieldName: extraParams, FilterIDs: filterIDs}, nil
@@ -1022,3 +1281,23 @@ func (sum *StatDistinct) LoadMarshaled(ms Marshaler, marshaled []byte) (err erro
func (sum *StatDistinct) GetFilterIDs() []string {
return sum.FilterIDs
}
func (sum *StatDistinct) Compress(queueLen int64, defaultID string) (eventIDs []string) {
for id, _ := range sum.Events {
eventIDs = append(eventIDs, id)
}
return
}
// Compress is part of StatMetric interface
func (sum *StatDistinct) GetCompressFactor(events map[string]int) map[string]int {
for id, _ := range sum.Events {
if _, has := events[id]; !has {
events[id] = 1
}
if events[id] < 1 {
events[id] = 1
}
}
return events
}