diff --git a/config/config_defaults.go b/config/config_defaults.go index 27018438e..8b544c852 100755 --- a/config/config_defaults.go +++ b/config/config_defaults.go @@ -484,6 +484,7 @@ const CGRATES_CFG_JSON = ` "stats": { // StatS config "enabled": false, // starts Stat service: . "store_interval": "", // dump cache regularly to dataDB, 0 - dump at start/shutdown: <""|$dur> + "max_queue_lenght": 0, // used to compress data "thresholds_conns": [], // connections to ThresholdS for StatUpdates, empty to disable thresholds functionality: <""|*internal|x.y.z.y:1234> "indexed_selects":true, // enable profile matching exclusively on indexes //"string_indexed_fields": [], // query indexes based on these fields for faster processing diff --git a/config/config_json_test.go b/config/config_json_test.go index 1bd30e13a..46c6f66a3 100755 --- a/config/config_json_test.go +++ b/config/config_json_test.go @@ -800,6 +800,7 @@ func TestDfStatServiceJsonCfg(t *testing.T) { Enabled: utils.BoolPointer(false), Indexed_selects: utils.BoolPointer(true), Store_interval: utils.StringPointer(""), + Max_queue_lenght: utils.IntPointer(0), Thresholds_conns: &[]*HaPoolJsonCfg{}, String_indexed_fields: nil, Prefix_indexed_fields: &[]string{}, diff --git a/config/libconfig_json.go b/config/libconfig_json.go index 7e06b5d97..a96a750ea 100755 --- a/config/libconfig_json.go +++ b/config/libconfig_json.go @@ -437,6 +437,7 @@ type StatServJsonCfg struct { Enabled *bool Indexed_selects *bool Store_interval *string + Max_queue_lenght *int Thresholds_conns *[]*HaPoolJsonCfg String_indexed_fields *[]string Prefix_indexed_fields *[]string diff --git a/config/statscfg.go b/config/statscfg.go index 1144698d4..229870b1a 100644 --- a/config/statscfg.go +++ b/config/statscfg.go @@ -28,6 +28,7 @@ type StatSCfg struct { Enabled bool IndexedSelects bool StoreInterval time.Duration // Dump regularly from cache into dataDB + MaxQueueLenght int ThresholdSConns []*RemoteHost StringIndexedFields *[]string PrefixIndexedFields *[]string @@ -48,6 +49,9 @@ func (st *StatSCfg) loadFromJsonCfg(jsnCfg *StatServJsonCfg) (err error) { return err } } + if jsnCfg.Max_queue_lenght != nil { + st.MaxQueueLenght = *jsnCfg.Max_queue_lenght + } if jsnCfg.Thresholds_conns != nil { st.ThresholdSConns = make([]*RemoteHost, len(*jsnCfg.Thresholds_conns)) for idx, jsnHaCfg := range *jsnCfg.Thresholds_conns { diff --git a/data/conf/cgrates/cgrates.json b/data/conf/cgrates/cgrates.json index f42d0afd1..7fcf229a0 100644 --- a/data/conf/cgrates/cgrates.json +++ b/data/conf/cgrates/cgrates.json @@ -458,6 +458,7 @@ // "stats": { // StatS config // "enabled": false, // starts Stat service: . // "store_interval": "", // dump cache regularly to dataDB, 0 - dump at start/shutdown: <""|$dur> +// "max_queue_lenght": 0, // used to compress data // "thresholds_conns": [], // connections to ThresholdS for StatUpdates, empty to disable thresholds functionality: <""|*internal|x.y.z.y:1234> // "indexed_selects":true, // enable profile matching exclusively on indexes // //"string_indexed_fields": [], // query indexes based on these fields for faster processing diff --git a/engine/libstats.go b/engine/libstats.go index de4e2da3c..a683f402c 100644 --- a/engine/libstats.go +++ b/engine/libstats.go @@ -56,11 +56,12 @@ type MetricWithFilters struct { // NewStoredStatQueue initiates a StoredStatQueue out of StatQueue func NewStoredStatQueue(sq *StatQueue, ms Marshaler) (sSQ *StoredStatQueue, err error) { sSQ = &StoredStatQueue{ - Tenant: sq.Tenant, - ID: sq.ID, - SQItems: make([]SQItem, len(sq.SQItems)), - SQMetrics: make(map[string][]byte, len(sq.SQMetrics)), - MinItems: sq.MinItems, + Tenant: sq.Tenant, + ID: sq.ID, + Compressed: sq.Compress(int64(config.CgrConfig().StatSCfg().MaxQueueLenght)), + SQItems: make([]SQItem, len(sq.SQItems)), + SQMetrics: make(map[string][]byte, len(sq.SQMetrics)), + MinItems: sq.MinItems, } for i, sqItm := range sq.SQItems { sSQ.SQItems[i] = sqItm @@ -77,11 +78,12 @@ func NewStoredStatQueue(sq *StatQueue, ms Marshaler) (sSQ *StoredStatQueue, err // StoredStatQueue differs from StatQueue due to serialization of SQMetrics type StoredStatQueue struct { - Tenant string - ID string - SQItems []SQItem - SQMetrics map[string][]byte - MinItems int + Tenant string + ID string + SQItems []SQItem + SQMetrics map[string][]byte + MinItems int + Compressed bool } // SqID will compose the unique identifier for the StatQueue out of Tenant and ID @@ -110,6 +112,9 @@ func (ssq *StoredStatQueue) AsStatQueue(ms Marshaler) (sq *StatQueue, err error) sq.SQMetrics[metricID] = metric } } + if ssq.Compressed { + sq.Expand() + } return } @@ -223,6 +228,69 @@ func (sq *StatQueue) addStatEvent(ev *utils.CGREvent, filterS *FilterS) (err err return } +func (sq *StatQueue) Compress(maxQL int64) bool { + if int64(len(sq.SQItems)) < maxQL || maxQL == 0 { + return false + } + var newSQItems []SQItem + SQMap := make(map[string]*time.Time) + idMap := make(map[string]struct{}) + defaultCompressID := utils.UUIDSha1Prefix() + defaultTTL := sq.SQItems[len(sq.SQItems)-1].ExpiryTime + + SQMap[defaultCompressID] = defaultTTL + for _, sqitem := range sq.SQItems { + SQMap[sqitem.EventID] = sqitem.ExpiryTime + } + + for _, m := range sq.SQMetrics { + for _, id := range m.Compress(maxQL, defaultCompressID) { + idMap[id] = struct{}{} + } + } + for k, _ := range idMap { + ttl, has := SQMap[k] + if !has { // log warning + ttl = defaultTTL + } + newSQItems = append(newSQItems, SQItem{ + EventID: k, + ExpiryTime: ttl, + }) + } + if sq.ttl != nil { + sort.Slice(newSQItems, func(i, j int) bool { + if newSQItems[i].ExpiryTime == nil { + return false + } + if newSQItems[j].ExpiryTime == nil { + return true + } + return newSQItems[i].ExpiryTime.Before(*(newSQItems[j].ExpiryTime)) + }) + } + sq.SQItems = newSQItems + return true +} + +func (sq *StatQueue) Expand() { + compressFactorMap := make(map[string]int) + for _, m := range sq.SQMetrics { + compressFactorMap = m.GetCompressFactor(compressFactorMap) + } + var newSQItems []SQItem + for _, sqi := range sq.SQItems { + cf, has := compressFactorMap[sqi.EventID] + if !has { + continue + } + for i := 0; i < cf; i++ { + newSQItems = append(newSQItems, sqi) + } + } + sq.SQItems = newSQItems +} + // StatQueues is a sortable list of StatQueue type StatQueues []*StatQueue diff --git a/engine/statmetrics.go b/engine/statmetrics.go index b26165617..c139f8c60 100644 --- a/engine/statmetrics.go +++ b/engine/statmetrics.go @@ -80,6 +80,8 @@ type StatMetric interface { Marshal(ms Marshaler) (marshaled []byte, err error) LoadMarshaled(ms Marshaler, marshaled []byte) (err error) GetFilterIDs() (filterIDs []string) + Compress(queueLen int64, defaultID string) (eventIDs []string) + GetCompressFactor(events map[string]int) map[string]int } func NewASR(minItems int, extraParams string, filterIDs []string) (StatMetric, error) { @@ -190,6 +192,36 @@ func (asr *StatASR) GetFilterIDs() []string { return asr.FilterIDs } +// Compress is part of StatMetric interface +func (asr *StatASR) Compress(queueLen int64, defaultID string) (eventIDs []string) { + if asr.Count < queueLen { + for id, _ := range asr.Events { + eventIDs = append(eventIDs, id) + } + return + } + stat := &StatWithCompress{ + Stat: utils.Round(asr.Answered/float64(asr.Count), + config.CgrConfig().GeneralCfg().RoundingDecimals, utils.ROUNDING_MIDDLE), + CompressFactor: int(asr.Count), + } + asr.Events = map[string]*StatWithCompress{defaultID: stat} + return []string{defaultID} +} + +// Compress is part of StatMetric interface +func (asr *StatASR) GetCompressFactor(events map[string]int) map[string]int { + for id, val := range asr.Events { + if _, has := events[id]; !has { + events[id] = val.CompressFactor + } + if events[id] < val.CompressFactor { + events[id] = val.CompressFactor + } + } + return events +} + func NewACD(minItems int, extraParams string, filterIDs []string) (StatMetric, error) { return &StatACD{Events: make(map[string]*DurationWithCompress), MinItems: minItems, FilterIDs: filterIDs}, nil } @@ -288,6 +320,35 @@ func (acd *StatACD) GetFilterIDs() []string { return acd.FilterIDs } +// Compress is part of StatMetric interface +func (acd *StatACD) Compress(queueLen int64, defaultID string) (eventIDs []string) { + if acd.Count < queueLen { + for id, _ := range acd.Events { + eventIDs = append(eventIDs, id) + } + return + } + stat := &DurationWithCompress{ + Duration: time.Duration(acd.Sum.Nanoseconds() / acd.Count), + CompressFactor: int(acd.Count), + } + acd.Events = map[string]*DurationWithCompress{defaultID: stat} + return []string{defaultID} +} + +// Compress is part of StatMetric interface +func (acd *StatACD) GetCompressFactor(events map[string]int) map[string]int { + for id, val := range acd.Events { + if _, has := events[id]; !has { + events[id] = val.CompressFactor + } + if events[id] < val.CompressFactor { + events[id] = val.CompressFactor + } + } + return events +} + func NewTCD(minItems int, extraParams string, filterIDs []string) (StatMetric, error) { return &StatTCD{Events: make(map[string]*DurationWithCompress), MinItems: minItems, FilterIDs: filterIDs}, nil } @@ -387,6 +448,35 @@ func (tcd *StatTCD) GetFilterIDs() []string { return tcd.FilterIDs } +// Compress is part of StatMetric interface +func (tcd *StatTCD) Compress(queueLen int64, defaultID string) (eventIDs []string) { + if tcd.Count < queueLen { + for id, _ := range tcd.Events { + eventIDs = append(eventIDs, id) + } + return + } + stat := &DurationWithCompress{ + Duration: time.Duration(tcd.Sum.Nanoseconds() / tcd.Count), + CompressFactor: int(tcd.Count), + } + tcd.Events = map[string]*DurationWithCompress{defaultID: stat} + return []string{defaultID} +} + +// Compress is part of StatMetric interface +func (tcd *StatTCD) GetCompressFactor(events map[string]int) map[string]int { + for id, val := range tcd.Events { + if _, has := events[id]; !has { + events[id] = val.CompressFactor + } + if events[id] < val.CompressFactor { + events[id] = val.CompressFactor + } + } + return events +} + func NewACC(minItems int, extraParams string, filterIDs []string) (StatMetric, error) { return &StatACC{Events: make(map[string]*StatWithCompress), MinItems: minItems, FilterIDs: filterIDs}, nil } @@ -481,6 +571,36 @@ func (acc *StatACC) GetFilterIDs() []string { return acc.FilterIDs } +// Compress is part of StatMetric interface +func (acc *StatACC) Compress(queueLen int64, defaultID string) (eventIDs []string) { + if acc.Count < queueLen { + for id, _ := range acc.Events { + eventIDs = append(eventIDs, id) + } + return + } + stat := &StatWithCompress{ + Stat: utils.Round((acc.Sum / float64(acc.Count)), + config.CgrConfig().GeneralCfg().RoundingDecimals, utils.ROUNDING_MIDDLE), + CompressFactor: int(acc.Count), + } + acc.Events = map[string]*StatWithCompress{defaultID: stat} + return []string{defaultID} +} + +// Compress is part of StatMetric interface +func (acc *StatACC) GetCompressFactor(events map[string]int) map[string]int { + for id, val := range acc.Events { + if _, has := events[id]; !has { + events[id] = val.CompressFactor + } + if events[id] < val.CompressFactor { + events[id] = val.CompressFactor + } + } + return events +} + func NewTCC(minItems int, extraParams string, filterIDs []string) (StatMetric, error) { return &StatTCC{Events: make(map[string]*StatWithCompress), MinItems: minItems, FilterIDs: filterIDs}, nil } @@ -577,6 +697,36 @@ func (tcc *StatTCC) GetFilterIDs() []string { return tcc.FilterIDs } +// Compress is part of StatMetric interface +func (tcc *StatTCC) Compress(queueLen int64, defaultID string) (eventIDs []string) { + if tcc.Count < queueLen { + for id, _ := range tcc.Events { + eventIDs = append(eventIDs, id) + } + return + } + stat := &StatWithCompress{ + Stat: utils.Round((tcc.Sum / float64(tcc.Count)), + config.CgrConfig().GeneralCfg().RoundingDecimals, utils.ROUNDING_MIDDLE), + CompressFactor: int(tcc.Count), + } + tcc.Events = map[string]*StatWithCompress{defaultID: stat} + return []string{defaultID} +} + +// Compress is part of StatMetric interface +func (tcc *StatTCC) GetCompressFactor(events map[string]int) map[string]int { + for id, val := range tcc.Events { + if _, has := events[id]; !has { + events[id] = val.CompressFactor + } + if events[id] < val.CompressFactor { + events[id] = val.CompressFactor + } + } + return events +} + func NewPDD(minItems int, extraParams string, filterIDs []string) (StatMetric, error) { return &StatPDD{Events: make(map[string]*DurationWithCompress), MinItems: minItems, FilterIDs: filterIDs}, nil } @@ -675,6 +825,35 @@ func (pdd *StatPDD) GetFilterIDs() []string { return pdd.FilterIDs } +// Compress is part of StatMetric interface +func (pdd *StatPDD) Compress(queueLen int64, defaultID string) (eventIDs []string) { + if pdd.Count < queueLen { + for id, _ := range pdd.Events { + eventIDs = append(eventIDs, id) + } + return + } + stat := &DurationWithCompress{ + Duration: time.Duration(pdd.Sum.Nanoseconds() / pdd.Count), + CompressFactor: int(pdd.Count), + } + pdd.Events = map[string]*DurationWithCompress{defaultID: stat} + return []string{defaultID} +} + +// Compress is part of StatMetric interface +func (pdd *StatPDD) GetCompressFactor(events map[string]int) map[string]int { + for id, val := range pdd.Events { + if _, has := events[id]; !has { + events[id] = val.CompressFactor + } + if events[id] < val.CompressFactor { + events[id] = val.CompressFactor + } + } + return events +} + func NewDDC(minItems int, extraParams string, filterIDs []string) (StatMetric, error) { return &StatDDC{Destinations: make(map[string]utils.StringMap), Events: make(map[string]string), MinItems: minItems, FilterIDs: filterIDs}, nil @@ -750,6 +929,26 @@ func (ddc *StatDDC) GetFilterIDs() []string { return ddc.FilterIDs } +func (ddc *StatDDC) Compress(queueLen int64, defaultID string) (eventIDs []string) { + for id, _ := range ddc.Events { + eventIDs = append(eventIDs, id) + } + return +} + +// Compress is part of StatMetric interface +func (ddc *StatDDC) GetCompressFactor(events map[string]int) map[string]int { + for id, _ := range ddc.Events { + if _, has := events[id]; !has { + events[id] = 1 + } + if events[id] < 1 { + events[id] = 1 + } + } + return events +} + func NewStatSum(minItems int, extraParams string, filterIDs []string) (StatMetric, error) { return &StatSum{Events: make(map[string]*StatWithCompress), MinItems: minItems, FieldName: extraParams, FilterIDs: filterIDs}, nil @@ -847,6 +1046,36 @@ func (sum *StatSum) GetFilterIDs() []string { return sum.FilterIDs } +// Compress is part of StatMetric interface +func (sum *StatSum) Compress(queueLen int64, defaultID string) (eventIDs []string) { + if sum.Count < queueLen { + for id, _ := range sum.Events { + eventIDs = append(eventIDs, id) + } + return + } + stat := &StatWithCompress{ + Stat: utils.Round((sum.Sum / float64(sum.Count)), + config.CgrConfig().GeneralCfg().RoundingDecimals, utils.ROUNDING_MIDDLE), + CompressFactor: int(sum.Count), + } + sum.Events = map[string]*StatWithCompress{defaultID: stat} + return []string{defaultID} +} + +// Compress is part of StatMetric interface +func (sum *StatSum) GetCompressFactor(events map[string]int) map[string]int { + for id, val := range sum.Events { + if _, has := events[id]; !has { + events[id] = val.CompressFactor + } + if events[id] < val.CompressFactor { + events[id] = val.CompressFactor + } + } + return events +} + func NewStatAverage(minItems int, extraParams string, filterIDs []string) (StatMetric, error) { return &StatAverage{Events: make(map[string]*StatWithCompress), MinItems: minItems, FieldName: extraParams, FilterIDs: filterIDs}, nil @@ -945,6 +1174,36 @@ func (avg *StatAverage) GetFilterIDs() []string { return avg.FilterIDs } +// Compress is part of StatMetric interface +func (avg *StatAverage) Compress(queueLen int64, defaultID string) (eventIDs []string) { + if avg.Count < queueLen { + for id, _ := range avg.Events { + eventIDs = append(eventIDs, id) + } + return + } + stat := &StatWithCompress{ + Stat: utils.Round((avg.Sum / float64(avg.Count)), + config.CgrConfig().GeneralCfg().RoundingDecimals, utils.ROUNDING_MIDDLE), + CompressFactor: int(avg.Count), + } + avg.Events = map[string]*StatWithCompress{defaultID: stat} + return []string{defaultID} +} + +// Compress is part of StatMetric interface +func (avg *StatAverage) GetCompressFactor(events map[string]int) map[string]int { + for id, val := range avg.Events { + if _, has := events[id]; !has { + events[id] = val.CompressFactor + } + if events[id] < val.CompressFactor { + events[id] = val.CompressFactor + } + } + return events +} + func NewStatDistinct(minItems int, extraParams string, filterIDs []string) (StatMetric, error) { return &StatDistinct{Events: make(map[string]struct{}), MinItems: minItems, FieldName: extraParams, FilterIDs: filterIDs}, nil @@ -1022,3 +1281,23 @@ func (sum *StatDistinct) LoadMarshaled(ms Marshaler, marshaled []byte) (err erro func (sum *StatDistinct) GetFilterIDs() []string { return sum.FilterIDs } + +func (sum *StatDistinct) Compress(queueLen int64, defaultID string) (eventIDs []string) { + for id, _ := range sum.Events { + eventIDs = append(eventIDs, id) + } + return +} + +// Compress is part of StatMetric interface +func (sum *StatDistinct) GetCompressFactor(events map[string]int) map[string]int { + for id, _ := range sum.Events { + if _, has := events[id]; !has { + events[id] = 1 + } + if events[id] < 1 { + events[id] = 1 + } + } + return events +}