diff --git a/apier/v1/stats.go b/apier/v1/stats.go index 5af1ae918..9dbe0df36 100644 --- a/apier/v1/stats.go +++ b/apier/v1/stats.go @@ -17,6 +17,7 @@ along with this program. If not, see */ package v1 +/* import ( "reflect" "strings" @@ -136,3 +137,4 @@ func (apierV1 *ApierV1) RemStatConfig(attrs AttrGetStatsCfg, reply *string) erro return nil } +*/ diff --git a/apier/v1/stats_it_test.go b/apier/v1/stats_it_test.go index f230ed026..1197db21f 100644 --- a/apier/v1/stats_it_test.go +++ b/apier/v1/stats_it_test.go @@ -19,6 +19,7 @@ along with this program. If not, see */ package v1 +/* import ( "math/rand" "net/rpc" @@ -346,3 +347,4 @@ func BenchmarkStatSV1GetStringMetrics(b *testing.B) { } } } +*/ diff --git a/cmd/cgr-engine/cgr-engine.go b/cmd/cgr-engine/cgr-engine.go index 556cbae4b..1a08940ce 100644 --- a/cmd/cgr-engine/cgr-engine.go +++ b/cmd/cgr-engine/cgr-engine.go @@ -39,7 +39,6 @@ import ( "github.com/cgrates/cgrates/scheduler" "github.com/cgrates/cgrates/servmanager" "github.com/cgrates/cgrates/sessionmanager" - "github.com/cgrates/cgrates/stats" "github.com/cgrates/cgrates/utils" "github.com/cgrates/rpcclient" ) @@ -560,6 +559,7 @@ func startResourceService(internalRsChan, internalStatSConn chan rpcclient.RpcCl internalRsChan <- rsV1 } +/* // startStatService fires up the StatS func startStatService(internalStatSChan chan rpcclient.RpcClientConnection, cfg *config.CGRConfig, dataDB engine.DataDB, ms engine.Marshaler, server *utils.Server, exitChan chan bool) { @@ -582,6 +582,7 @@ func startStatService(internalStatSChan chan rpcclient.RpcClientConnection, cfg server.RpcRegister(stsV1) internalStatSChan <- stsV1 } +*/ func startRpc(server *utils.Server, internalRaterChan, internalCdrSChan, internalCdrStatSChan, internalHistorySChan, internalPubSubSChan, internalUserSChan, @@ -696,17 +697,6 @@ func main() { // Init cache cache.NewCache(cfg.CacheConfig) - var ms engine.Marshaler - if ms, err = engine.NewMarshaler(cfg.DBDataEncoding); err != nil { - log.Fatalf("error initializing marshaler: ", err) - return - } - if err != nil { - utils.Logger.Crit(fmt.Sprintf(" Could not start, error: %s", err.Error())) - exitChan <- true - return - } - var dataDB engine.DataDB var loadDb engine.LoadStorage var cdrDb engine.CdrStorage @@ -857,9 +847,9 @@ func main() { internalStatSChan, cfg, dataDB, server, exitChan) } - if cfg.StatSCfg().Enabled { - go startStatService(internalStatSChan, cfg, dataDB, ms, server, exitChan) - } + //if cfg.StatSCfg().Enabled { + // go startStatService(internalStatSChan, cfg, dataDB, ms, server, exitChan) + //} // Serve rpc connections go startRpc(server, internalRaterChan, internalCdrSChan, internalCdrStatSChan, internalHistorySChan, diff --git a/engine/libstats.go b/engine/libstats.go index aad179ba9..0fa6f8387 100755 --- a/engine/libstats.go +++ b/engine/libstats.go @@ -19,6 +19,7 @@ package engine import ( "errors" + "sort" "time" "github.com/cgrates/cgrates/utils" @@ -30,14 +31,6 @@ type SQItem struct { ExpiryTime *time.Time // Used to auto-expire events } -// SQStoredMetrics contains metrics saved in DB -type SQStoredMetrics struct { - SqID string // StatsInstanceID - SEvents map[string]StatsEvent // Events used by SQItems - SQItems []*SQItem // SQItems - SQMetrics map[string][]byte -} - // StatsConfig represents the configuration of a StatsInstance in StatS type StatsConfig struct { ID string // QueueID @@ -78,3 +71,123 @@ func (se StatsEvent) AnswerTime(timezone string) (at time.Time, err error) { } return utils.ParseTimeDetectLayout(atStr, timezone) } + +// StatQueue represents an individual stats instance +type StatQueue struct { + ID string + SQItems []*SQItem // SQItems + SQMetrics map[string]StatsMetric + sqItems []*SQItem + sqPrfl *StatsConfig + dirty *bool // needs save +} + +/* +// GetSQStoredMetrics retrieves the data used for store to DB +func (sq *StatQueue) GetStoredMetrics() (sqSM *engine.SQStoredMetrics) { + sq.RLock() + defer sq.RUnlock() + sEvents := make(map[string]engine.StatsEvent) + var sItems []*engine.SQItem + for _, sqItem := range sq.sqItems { // make sure event is properly retrieved from cache + ev := sq.sec.GetEvent(sqItem.EventID) + if ev == nil { + utils.Logger.Warning(fmt.Sprintf(" querying for storage eventID: %s, error: event not cached", + sqItem.EventID)) + continue + } + sEvents[sqItem.EventID] = ev + sItems = append(sItems, sqItem) + } + sqSM = &engine.SQStoredMetrics{ + SEvents: sEvents, + SQItems: sItems, + SQMetrics: make(map[string][]byte, len(sq.sqMetrics))} + for metricID, metric := range sq.sqMetrics { + var err error + if sqSM.SQMetrics[metricID], err = metric.GetMarshaled(sq.ms); err != nil { + utils.Logger.Warning(fmt.Sprintf(" querying for storage metricID: %s, error: %s", + metricID, err.Error())) + continue + } + } + return +} + +// ProcessEvent processes a StatsEvent, returns true if processed +func (sq *StatQueue) ProcessEvent(ev engine.StatsEvent) (err error) { + sq.Lock() + sq.remExpired() + sq.remOnQueueLength() + sq.addStatsEvent(ev) + sq.Unlock() + return +} + +// remExpired expires items in queue +func (sq *StatQueue) remExpired() { + var expIdx *int // index of last item to be expired + for i, item := range sq.sqItems { + if item.ExpiryTime == nil { + break + } + if item.ExpiryTime.After(time.Now()) { + break + } + sq.remEventWithID(item.EventID) + item = nil // garbage collected asap + expIdx = &i + } + if expIdx == nil { + return + } + nextValidIdx := *expIdx + 1 + sq.sqItems = sq.sqItems[nextValidIdx:] +} + +// remOnQueueLength rems elements based on QueueLength setting +func (sq *StatQueue) remOnQueueLength() { + if sq.cfg.QueueLength == 0 { + return + } + if len(sq.sqItems) == sq.cfg.QueueLength { // reached limit, rem first element + itm := sq.sqItems[0] + sq.remEventWithID(itm.EventID) + itm = nil + sq.sqItems = sq.sqItems[1:] + } +} + +// addStatsEvent computes metrics for an event +func (sq *StatQueue) addStatsEvent(ev engine.StatsEvent) { + evID := ev.ID() + for metricID, metric := range sq.sqMetrics { + if err := metric.AddEvent(ev); err != nil { + utils.Logger.Warning(fmt.Sprintf(" metricID: %s, add eventID: %s, error: %s", + metricID, evID, err.Error())) + } + } +} + +// remStatsEvent removes an event from metrics +func (sq *StatQueue) remEventWithID(evID string) { + ev := sq.sec.GetEvent(evID) + if ev == nil { + utils.Logger.Warning(fmt.Sprintf(" removing eventID: %s, error: event not cached", evID)) + return + } + for metricID, metric := range sq.sqMetrics { + if err := metric.RemEvent(ev); err != nil { + utils.Logger.Warning(fmt.Sprintf(" metricID: %s, remove eventID: %s, error: %s", metricID, evID, err.Error())) + } + } +} +*/ + +// StatQueues is a sortable list of StatQueue +type StatQueues []*StatQueue + +// Sort is part of sort interface, sort based on Weight +func (sis StatQueues) Sort() { + sort.Slice(sis, func(i, j int) bool { return sis[i].sqPrfl.Weight > sis[j].sqPrfl.Weight }) +} diff --git a/engine/storage_interface.go b/engine/storage_interface.go index 92853f240..9107ce2b0 100755 --- a/engine/storage_interface.go +++ b/engine/storage_interface.go @@ -117,9 +117,9 @@ type DataDB interface { GetStatsConfig(sqID string) (sq *StatsConfig, err error) SetStatsConfig(sq *StatsConfig) (err error) RemStatsConfig(sqID string) (err error) - GetSQStoredMetrics(sqID string) (sqSM *SQStoredMetrics, err error) - SetSQStoredMetrics(sqSM *SQStoredMetrics) (err error) - RemSQStoredMetrics(sqID string) (err error) + GetStatQueue(sqID string) (sqSM *StatQueue, err error) + SetStatQueue(sq *StatQueue) (err error) + RemStatQueue(sqID string) (err error) GetThresholdCfg(ID string, skipCache bool, transactionID string) (th *ThresholdCfg, err error) SetThresholdCfg(th *ThresholdCfg) (err error) RemThresholdCfg(ID string, transactionID string) (err error) diff --git a/engine/storage_map.go b/engine/storage_map.go index d79793221..a21983e0d 100755 --- a/engine/storage_map.go +++ b/engine/storage_map.go @@ -1564,36 +1564,36 @@ func (ms *MapStorage) RemStatsConfig(scfID string) (err error) { return } -// GetSQStoredMetrics retrieves the stored metrics for a StatsQueue -func (ms *MapStorage) GetSQStoredMetrics(sqID string) (sqSM *SQStoredMetrics, err error) { +// GetStatQueue retrieves the stored metrics for a StatsQueue +func (ms *MapStorage) GetStatQueue(sqID string) (sq *StatQueue, err error) { ms.mu.RLock() defer ms.mu.RUnlock() - values, ok := ms.dict[utils.SQStoredMetricsPrefix+sqID] + values, ok := ms.dict[utils.StatQueuePrefix+sqID] if !ok { return nil, utils.ErrNotFound } - err = ms.ms.Unmarshal(values, &sqSM) + err = ms.ms.Unmarshal(values, &sq) return } -// SetStoredSQ stores the metrics for a StatsQueue -func (ms *MapStorage) SetSQStoredMetrics(sqSM *SQStoredMetrics) (err error) { +// SetStatQueue stores the metrics for a StatsQueue +func (ms *MapStorage) SetStatQueue(sq *StatQueue) (err error) { ms.mu.Lock() defer ms.mu.Unlock() var result []byte - result, err = ms.ms.Marshal(sqSM) + result, err = ms.ms.Marshal(sq) if err != nil { return err } - ms.dict[utils.SQStoredMetricsPrefix+sqSM.SqID] = result + ms.dict[utils.StatQueuePrefix+sq.ID] = result return } -// RemSQStoredMetrics removes stored metrics for a StatsQueue -func (ms *MapStorage) RemSQStoredMetrics(sqID string) (err error) { +// RemStatQueue removes a StatsQueue +func (ms *MapStorage) RemStatQueue(sqID string) (err error) { ms.mu.Lock() defer ms.mu.Unlock() - delete(ms.dict, utils.SQStoredMetricsPrefix+sqID) + delete(ms.dict, utils.StatQueuePrefix+sqID) return } diff --git a/engine/storage_mongo_datadb.go b/engine/storage_mongo_datadb.go index 4f965fb8e..230b47639 100755 --- a/engine/storage_mongo_datadb.go +++ b/engine/storage_mongo_datadb.go @@ -61,6 +61,7 @@ const ( colRFI = "request_filter_indexes" colTmg = "timings" colRes = "resources" + colStQs = "statqueues" ) var ( @@ -2082,11 +2083,11 @@ func (ms *MongoStorage) RemStatsConfig(sqID string) (err error) { return } -// GetSQStoredMetrics retrieves the stored metrics for a StatsQueue -func (ms *MongoStorage) GetSQStoredMetrics(sqmID string) (sqSM *SQStoredMetrics, err error) { - session, col := ms.conn(utils.SQStoredMetricsPrefix) +// GetStatQueue retrieves a StatsQueue +func (ms *MongoStorage) GetStatQueue(sqID string) (sq *StatQueue, err error) { + session, col := ms.conn(colStQs) defer session.Close() - if err = col.Find(bson.M{"sqid": sqmID}).One(&sqSM); err != nil { + if err = col.Find(bson.M{"id": sqID}).One(&sq); err != nil { if err == mgo.ErrNotFound { err = utils.ErrNotFound } @@ -2096,16 +2097,16 @@ func (ms *MongoStorage) GetSQStoredMetrics(sqmID string) (sqSM *SQStoredMetrics, } // SetStoredSQ stores the metrics for a StatsQueue -func (ms *MongoStorage) SetSQStoredMetrics(sqSM *SQStoredMetrics) (err error) { - session, col := ms.conn(utils.SQStoredMetricsPrefix) +func (ms *MongoStorage) SetStatQueue(sq *StatQueue) (err error) { + session, col := ms.conn(colStQs) defer session.Close() - _, err = col.UpsertId(bson.M{"sqid": sqSM.SqID}, sqSM) + _, err = col.Upsert(bson.M{"id": sq.ID}, sq) return } -// RemSQStoredMetrics removes stored metrics for a StatsQueue -func (ms *MongoStorage) RemSQStoredMetrics(sqmID string) (err error) { - session, col := ms.conn(utils.SQStoredMetricsPrefix) +// RemStatQueue removes stored metrics for a StatsQueue +func (ms *MongoStorage) RemStatQueue(sqmID string) (err error) { + session, col := ms.conn(colStQs) defer session.Close() err = col.Remove(bson.M{"sqid": sqmID}) return err diff --git a/engine/storage_redis.go b/engine/storage_redis.go index 9b335582d..7e1d98e09 100755 --- a/engine/storage_redis.go +++ b/engine/storage_redis.go @@ -1628,9 +1628,9 @@ func (rs *RedisStorage) RemStatsConfig(sqID string) (err error) { return } -// GetSQStoredMetrics retrieves the stored metrics for a StatsQueue -func (rs *RedisStorage) GetSQStoredMetrics(sqmID string) (sqSM *SQStoredMetrics, err error) { - key := utils.SQStoredMetricsPrefix + sqmID +// GetStatQueue retrieves the stored metrics for a StatsQueue +func (rs *RedisStorage) GetStatQueue(sqID string) (sq *StatQueue, err error) { + key := utils.StatQueuePrefix + sqID var values []byte if values, err = rs.Cmd("GET", key).Bytes(); err != nil { if err == redis.ErrRespNil { @@ -1638,25 +1638,25 @@ func (rs *RedisStorage) GetSQStoredMetrics(sqmID string) (sqSM *SQStoredMetrics, } return } - if err = rs.ms.Unmarshal(values, &sqSM); err != nil { + if err = rs.ms.Unmarshal(values, &sq); err != nil { return } return } -// SetStoredSQ stores the metrics for a StatsQueue -func (rs *RedisStorage) SetSQStoredMetrics(sqSM *SQStoredMetrics) (err error) { +// SetStatQueue stores the metrics for a StatsQueue +func (rs *RedisStorage) SetStatQueue(sq *StatQueue) (err error) { var result []byte - result, err = rs.ms.Marshal(sqSM) + result, err = rs.ms.Marshal(sq) if err != nil { return } - return rs.Cmd("SET", utils.SQStoredMetricsPrefix+sqSM.SqID, result).Err + return rs.Cmd("SET", utils.StatQueuePrefix+sq.ID, result).Err } -// RemSQStoredMetrics removes stored metrics for a StatsQueue -func (rs *RedisStorage) RemSQStoredMetrics(sqmID string) (err error) { - key := utils.SQStoredMetricsPrefix + sqmID +// RemStatQueue removes a StatsQueue +func (rs *RedisStorage) RemStatQueue(sqID string) (err error) { + key := utils.StatQueuePrefix + sqID if err = rs.Cmd("DEL", key).Err; err != nil { return } diff --git a/stats/acd.go b/stats/acd.go deleted file mode 100644 index f2a6cdfb9..000000000 --- a/stats/acd.go +++ /dev/null @@ -1,63 +0,0 @@ -/* -Real-time Online/Offline Charging System (OCS) for Telecom & ISP environments -Copyright (C) ITsysCOM GmbH - -This program is free software: you can redistribute it and/or modify -it under the terms of the GNU General Public License as published by -the Free Software Foundation, either version 3 of the License, or -(at your option) any later version. - -This program is distributed in the hope that it will be useful, -but WITHOUT ANY WARRANTY; without even the implied warranty of -MERCHANTABILITY or FITNESS FOR A PARTICULAR PURPOSE. See the -GNU General Public License for more details. - -You should have received a copy of the GNU General Public License -along with this program. If not, see -*/ - -package stats - -import ( - "time" - - "github.com/cgrates/cgrates/engine" -) - -func NewACD() (StatsMetric, error) { - return new(ACD), nil -} - -// ACD implements AverageCallDuration metric -type ACD struct { - Sum time.Duration - Count int -} - -func (acd *ACD) GetStringValue(fmtOpts string) (val string) { - return -} - -func (acd *ACD) GetValue() (v interface{}) { - return -} - -func (acd *ACD) GetFloat64Value() (v float64) { - return float64(engine.STATS_NA) -} - -func (acd *ACD) AddEvent(ev engine.StatsEvent) (err error) { - return -} - -func (acd *ACD) RemEvent(ev engine.StatsEvent) (err error) { - return -} - -func (acd *ACD) GetMarshaled(ms engine.Marshaler) (vals []byte, err error) { - return -} - -func (acd *ACD) SetFromMarshaled(vals []byte, ms engine.Marshaler) (err error) { - return -} diff --git a/stats/asr.go b/stats/asr.go deleted file mode 100644 index 4b24c450f..000000000 --- a/stats/asr.go +++ /dev/null @@ -1,87 +0,0 @@ -/* -Real-time Online/Offline Charging System (OCS) for Telecom & ISP environments -Copyright (C) ITsysCOM GmbH - -This program is free software: you can redistribute it and/or modify -it under the terms of the GNU General Public License as published by -the Free Software Foundation, either version 3 of the License, or -(at your option) any later version. - -This program is distributed in the hope that it will be useful, -but WITHOUT ANY WARRANTY; without even the implied warranty of -MERCHANTABILITY or FITNESS FOR A PARTICULAR PURPOSE. See the -GNU General Public License for more details. - -You should have received a copy of the GNU General Public License -along with this program. If not, see -*/ - -package stats - -import ( - "fmt" - - "github.com/cgrates/cgrates/config" - "github.com/cgrates/cgrates/engine" - "github.com/cgrates/cgrates/utils" -) - -func NewASR() (StatsMetric, error) { - return new(ASR), nil -} - -// ASR implements AverageSuccessRatio metric -type ASR struct { - Answered float64 - Count float64 -} - -func (asr *ASR) GetValue() (v interface{}) { - if asr.Count == 0 { - return float64(engine.STATS_NA) - } - return utils.Round((asr.Answered / asr.Count * 100), - config.CgrConfig().RoundingDecimals, utils.ROUNDING_MIDDLE) -} - -func (asr *ASR) GetStringValue(fmtOpts string) (valStr string) { - if asr.Count == 0 { - return utils.NOT_AVAILABLE - } - val := asr.GetValue().(float64) - return fmt.Sprintf("%v%%", val) // %v will automatically limit the number of decimals printed -} - -func (asr *ASR) GetFloat64Value() (val float64) { - return asr.GetValue().(float64) -} - -func (asr *ASR) AddEvent(ev engine.StatsEvent) (err error) { - if at, err := ev.AnswerTime(config.CgrConfig().DefaultTimezone); err != nil && - err != utils.ErrNotFound { - return err - } else if !at.IsZero() { - asr.Answered += 1 - } - asr.Count += 1 - return -} - -func (asr *ASR) RemEvent(ev engine.StatsEvent) (err error) { - if at, err := ev.AnswerTime(config.CgrConfig().DefaultTimezone); err != nil && - err != utils.ErrNotFound { - return err - } else if !at.IsZero() { - asr.Answered -= 1 - } - asr.Count -= 1 - return -} - -func (asr *ASR) GetMarshaled(ms engine.Marshaler) (vals []byte, err error) { - return ms.Marshal(asr) -} - -func (asr *ASR) SetFromMarshaled(vals []byte, ms engine.Marshaler) (err error) { - return ms.Unmarshal(vals, asr) -} diff --git a/stats/asr_test.go b/stats/asr_test.go deleted file mode 100644 index c0e1cb136..000000000 --- a/stats/asr_test.go +++ /dev/null @@ -1,85 +0,0 @@ -/* -Real-time Online/Offline Charging System (OCS) for Telecom & ISP environments -Copyright (C) ITsysCOM GmbH - -This program is free software: you can redistribute it and/or modify -it under the terms of the GNU General Public License as published by -the Free Software Foundation, either version 3 of the License, or -(at your option) any later version. - -This program is distributed in the hope that it will be useful, -but WITHOUT ANY WARRANTY; without even the implied warranty of -MERCHANTABILITY or FITNESS FOR A PARTICULAR PURPOSE. See the -GNU General Public License for more details. - -You should have received a copy of the GNU General Public License -along with this program. If not, see -*/ -package stats - -import ( - "testing" - "time" - - "github.com/cgrates/cgrates/engine" - "github.com/cgrates/cgrates/utils" -) - -func TestASRGetStringValue(t *testing.T) { - asr, _ := NewASR() - if strVal := asr.GetStringValue(""); strVal != utils.NOT_AVAILABLE { - t.Errorf("wrong asr value: %s", strVal) - } - ev := engine.StatsEvent{ - "AnswerTime": time.Date(2014, 7, 14, 14, 25, 0, 0, time.UTC)} - asr.AddEvent(ev) - if strVal := asr.GetStringValue(""); strVal != "100%" { - t.Errorf("wrong asr value: %s", strVal) - } - asr.AddEvent(engine.StatsEvent{}) - asr.AddEvent(engine.StatsEvent{}) - if strVal := asr.GetStringValue(""); strVal != "33.33333%" { - t.Errorf("wrong asr value: %s", strVal) - } - asr.RemEvent(engine.StatsEvent{}) - if strVal := asr.GetStringValue(""); strVal != "50%" { - t.Errorf("wrong asr value: %s", strVal) - } - asr.RemEvent(ev) - if strVal := asr.GetStringValue(""); strVal != "0%" { - t.Errorf("wrong asr value: %s", strVal) - } - asr.RemEvent(engine.StatsEvent{}) - if strVal := asr.GetStringValue(""); strVal != utils.NOT_AVAILABLE { - t.Errorf("wrong asr value: %s", strVal) - } - -} - -func TestASRGetValue(t *testing.T) { - asr, _ := NewASR() - ev := engine.StatsEvent{ - "AnswerTime": time.Date(2014, 7, 14, 14, 25, 0, 0, time.UTC), - } - asr.AddEvent(ev) - if v := asr.GetValue(); v != 100.0 { - t.Errorf("wrong asr value: %f", v) - } - asr.AddEvent(engine.StatsEvent{}) - asr.AddEvent(engine.StatsEvent{}) - if v := asr.GetValue(); v != 33.33333 { - t.Errorf("wrong asr value: %f", v) - } - asr.RemEvent(engine.StatsEvent{}) - if v := asr.GetValue(); v != 50.0 { - t.Errorf("wrong asr value: %f", v) - } - asr.RemEvent(ev) - if v := asr.GetValue(); v != 0.0 { - t.Errorf("wrong asr value: %f", v) - } - asr.RemEvent(engine.StatsEvent{}) - if v := asr.GetValue(); v != -1.0 { - t.Errorf("wrong asr value: %f", v) - } -} diff --git a/stats/eventcache.go b/stats/eventcache.go deleted file mode 100644 index 6c8d17302..000000000 --- a/stats/eventcache.go +++ /dev/null @@ -1,72 +0,0 @@ -/* -Real-time Online/Offline Charging System (OCS) for Telecom & ISP environments -Copyright (C) ITsysCOM GmbH - -This program is free software: you can redistribute it and/or modify -it under the terms of the GNU General Public License as published by -the Free Software Foundation, either version 3 of the License, or -(at your option) any later version. - -This program is distributed in the hope that it will be useful, -but WITHOUT ANY WARRANTY; without even the implied warranty of -MERCHANTABILITY or FITNESS FOR A PARTICULAR PURPOSE. See the -GNU General Public License for more details. - -You should have received a copy of the GNU General Public License -along with this program. If not, see -*/ -package stats - -import ( - "sync" - - "github.com/cgrates/cgrates/engine" - "github.com/cgrates/cgrates/utils" -) - -// NewStatsEventCache instantiates a StatsEventCache -func NewStatsEventCache() *StatsEventCache { - return &StatsEventCache{ - evCacheIdx: make(map[string]utils.StringMap), - evCache: make(map[string]engine.StatsEvent)} -} - -// StatsEventCache keeps a cache of StatsEvents which are referenced by StatsQueues -type StatsEventCache struct { - sync.RWMutex - evCacheIdx map[string]utils.StringMap // index events used in queues, map[eventID]map[queueID]bool - evCache map[string]engine.StatsEvent // cache for the processed events -} - -// Cache will cache an event and reference it in the index -func (sec *StatsEventCache) Cache(evID string, ev engine.StatsEvent, queueID string) { - if utils.IsSliceMember([]string{evID, queueID}, "") { - return - } - sec.Lock() - if _, hasIt := sec.evCache[evID]; !hasIt { - sec.evCache[evID] = ev - } - sec.evCacheIdx[evID][queueID] = true - sec.Unlock() -} - -func (sec *StatsEventCache) UnCache(evID string, ev engine.StatsEvent, queueID string) { - sec.Lock() - if _, hasIt := sec.evCache[evID]; !hasIt { - return - } - delete(sec.evCacheIdx[evID], queueID) - if len(sec.evCacheIdx[evID]) == 0 { - delete(sec.evCacheIdx, evID) - delete(sec.evCache, evID) - } - sec.Unlock() -} - -// GetEvent returns the event based on ID -func (sec *StatsEventCache) GetEvent(evID string) engine.StatsEvent { - sec.RLock() - defer sec.RUnlock() - return sec.evCache[evID] -} diff --git a/stats/metric.go b/stats/metric.go deleted file mode 100644 index 51ee293dd..000000000 --- a/stats/metric.go +++ /dev/null @@ -1,50 +0,0 @@ -/* -Real-time Online/Offline Charging System (OCS) for Telecom & ISP environments -Copyright (C) ITsysCOM GmbH - -This program is free software: you can redistribute it and/or modify -it under the terms of the GNU General Public License as published by -the Free Software Foundation, either version 3 of the License, or -(at your option) any later version. - -This program is distributed in the hope that it will be useful, -but WITHOUT ANY WARRANTY; without even the implied warranty of -MERCHANTABILITY or FITNESS FOR A PARTICULAR PURPOSE. See the -GNU General Public License for more details. - -You should have received a copy of the GNU General Public License -along with this program. If not, see -*/ - -package stats - -import ( - "fmt" - - "github.com/cgrates/cgrates/engine" - "github.com/cgrates/cgrates/utils" -) - -// NewStatsMetrics instantiates the StatsMetrics -// cfg serves as general purpose container to pass config options to metric -func NewStatsMetric(metricID string) (sm StatsMetric, err error) { - metrics := map[string]func() (StatsMetric, error){ - utils.MetaASR: NewASR, - utils.MetaACD: NewACD, - } - if _, has := metrics[metricID]; !has { - return nil, fmt.Errorf("unsupported metric: %s", metricID) - } - return metrics[metricID]() -} - -// StatsMetric is the interface which a metric should implement -type StatsMetric interface { - GetValue() interface{} - GetStringValue(fmtOpts string) (val string) - GetFloat64Value() (val float64) - AddEvent(ev engine.StatsEvent) error - RemEvent(ev engine.StatsEvent) error - GetMarshaled(ms engine.Marshaler) (vals []byte, err error) - SetFromMarshaled(vals []byte, ms engine.Marshaler) (err error) // mostly used to load from DB -} diff --git a/stats/queue.go b/stats/queue.go deleted file mode 100644 index 18f9f3064..000000000 --- a/stats/queue.go +++ /dev/null @@ -1,189 +0,0 @@ -/* -Real-time Online/Offline Charging System (OCS) for Telecom & ISP environments -Copyright (C) ITsysCOM GmbH - -This program is free software: you can redistribute it and/or modify -it under the terms of the GNU General Public License as published by -the Free Software Foundation, either version 3 of the License, or -(at your option) any later version. - -This program is distributed in the hope that it will be useful, -but WITHOUT ANY WARRANTY; without even the implied warranty of -MERCHANTABILITY or FITNESS FOR A PARTICULAR PURPOSE. See the -GNU General Public License for more details. - -You should have received a copy of the GNU General Public License -along with this program. If not, see -*/ -package stats - -import ( - "fmt" - "sort" - "sync" - "time" - - "github.com/cgrates/cgrates/engine" - "github.com/cgrates/cgrates/utils" -) - -// StatQueues is a sortable list of StatQueue -type StatQueues []*StatQueue - -// Sort is part of sort interface, sort based on Weight -func (sis StatQueues) Sort() { - sort.Slice(sis, func(i, j int) bool { return sis[i].cfg.Weight > sis[j].cfg.Weight }) -} - -// remWithID removes the queue with ID from slice -func (sis StatQueues) remWithID(qID string) { - for i, q := range sis { - if q.cfg.ID == qID { - copy(sis[i:], sis[i+1:]) - sis[len(sis)-1] = nil - sis = sis[:len(sis)-1] - break // there can be only one item with ID - } - } -} - -// NewStatQueue instantiates a StatQueue -func NewStatQueue(sec *StatsEventCache, ms engine.Marshaler, - sqCfg *engine.StatsConfig, sqSM *engine.SQStoredMetrics) (si *StatQueue, err error) { - si = &StatQueue{sec: sec, ms: ms, cfg: sqCfg, sqMetrics: make(map[string]StatsMetric)} - for _, metricID := range sqCfg.Metrics { - if si.sqMetrics[metricID], err = NewStatsMetric(metricID); err != nil { - return - } - } - if sqSM != nil { - for evID, ev := range sqSM.SEvents { - si.sec.Cache(evID, ev, si.cfg.ID) - } - si.sqItems = sqSM.SQItems - for metricID := range si.sqMetrics { - if _, has := si.sqMetrics[metricID]; !has { - if si.sqMetrics[metricID], err = NewStatsMetric(metricID); err != nil { - return - } - } - if stored, has := sqSM.SQMetrics[metricID]; !has { - continue - } else if err = si.sqMetrics[metricID].SetFromMarshaled(stored, ms); err != nil { - return - } - } - } - return -} - -// StatQueue represents an individual stats instance -type StatQueue struct { - sync.RWMutex - dirty bool // needs save - sec *StatsEventCache - sqItems []*engine.SQItem - sqMetrics map[string]StatsMetric - ms engine.Marshaler // used to get/set Metrics - cfg *engine.StatsConfig -} - -// GetSQStoredMetrics retrieves the data used for store to DB -func (sq *StatQueue) GetStoredMetrics() (sqSM *engine.SQStoredMetrics) { - sq.RLock() - defer sq.RUnlock() - sEvents := make(map[string]engine.StatsEvent) - var sItems []*engine.SQItem - for _, sqItem := range sq.sqItems { // make sure event is properly retrieved from cache - ev := sq.sec.GetEvent(sqItem.EventID) - if ev == nil { - utils.Logger.Warning(fmt.Sprintf(" querying for storage eventID: %s, error: event not cached", - sqItem.EventID)) - continue - } - sEvents[sqItem.EventID] = ev - sItems = append(sItems, sqItem) - } - sqSM = &engine.SQStoredMetrics{ - SEvents: sEvents, - SQItems: sItems, - SQMetrics: make(map[string][]byte, len(sq.sqMetrics))} - for metricID, metric := range sq.sqMetrics { - var err error - if sqSM.SQMetrics[metricID], err = metric.GetMarshaled(sq.ms); err != nil { - utils.Logger.Warning(fmt.Sprintf(" querying for storage metricID: %s, error: %s", - metricID, err.Error())) - continue - } - } - return -} - -// ProcessEvent processes a StatsEvent, returns true if processed -func (sq *StatQueue) ProcessEvent(ev engine.StatsEvent) (err error) { - sq.Lock() - sq.remExpired() - sq.remOnQueueLength() - sq.addStatsEvent(ev) - sq.Unlock() - return -} - -// remExpired expires items in queue -func (sq *StatQueue) remExpired() { - var expIdx *int // index of last item to be expired - for i, item := range sq.sqItems { - if item.ExpiryTime == nil { - break - } - if item.ExpiryTime.After(time.Now()) { - break - } - sq.remEventWithID(item.EventID) - item = nil // garbage collected asap - expIdx = &i - } - if expIdx == nil { - return - } - nextValidIdx := *expIdx + 1 - sq.sqItems = sq.sqItems[nextValidIdx:] -} - -// remOnQueueLength rems elements based on QueueLength setting -func (sq *StatQueue) remOnQueueLength() { - if sq.cfg.QueueLength == 0 { - return - } - if len(sq.sqItems) == sq.cfg.QueueLength { // reached limit, rem first element - itm := sq.sqItems[0] - sq.remEventWithID(itm.EventID) - itm = nil - sq.sqItems = sq.sqItems[1:] - } -} - -// addStatsEvent computes metrics for an event -func (sq *StatQueue) addStatsEvent(ev engine.StatsEvent) { - evID := ev.ID() - for metricID, metric := range sq.sqMetrics { - if err := metric.AddEvent(ev); err != nil { - utils.Logger.Warning(fmt.Sprintf(" metricID: %s, add eventID: %s, error: %s", - metricID, evID, err.Error())) - } - } -} - -// remStatsEvent removes an event from metrics -func (sq *StatQueue) remEventWithID(evID string) { - ev := sq.sec.GetEvent(evID) - if ev == nil { - utils.Logger.Warning(fmt.Sprintf(" removing eventID: %s, error: event not cached", evID)) - return - } - for metricID, metric := range sq.sqMetrics { - if err := metric.RemEvent(ev); err != nil { - utils.Logger.Warning(fmt.Sprintf(" metricID: %s, remove eventID: %s, error: %s", metricID, evID, err.Error())) - } - } -} diff --git a/stats/queue_test.go b/stats/queue_test.go deleted file mode 100644 index 560a1be2e..000000000 --- a/stats/queue_test.go +++ /dev/null @@ -1,44 +0,0 @@ -/* -Real-time Online/Offline Charging System (OCS) for Telecom & ISP environments -Copyright (C) ITsysCOM GmbH - -This program is free software: you can redistribute it and/or modify -it under the terms of the GNU General Public License as published by -the Free Software Foundation, either version 3 of the License, or -(at your option) any later version. - -This program is distributed in the hope that it will be useful, -but WITHOUT ANY WARRANTY; without even the implied warranty of -MERCHANTABILITY or FITNESS FOR A PARTICULAR PURPOSE. See the -GNU General Public License for more details. - -You should have received a copy of the GNU General Public License -along with this program. If not, see -*/ -package stats - -import ( - "reflect" - "testing" - - "github.com/cgrates/cgrates/engine" -) - -func TestStatQueuesSort(t *testing.T) { - sInsts := StatQueues{ - &StatQueue{cfg: &engine.StatsConfig{ID: "FIRST", Weight: 30.0}}, - &StatQueue{cfg: &engine.StatsConfig{ID: "SECOND", Weight: 40.0}}, - &StatQueue{cfg: &engine.StatsConfig{ID: "THIRD", Weight: 30.0}}, - &StatQueue{cfg: &engine.StatsConfig{ID: "FOURTH", Weight: 35.0}}, - } - sInsts.Sort() - eSInst := StatQueues{ - &StatQueue{cfg: &engine.StatsConfig{ID: "SECOND", Weight: 40.0}}, - &StatQueue{cfg: &engine.StatsConfig{ID: "FOURTH", Weight: 35.0}}, - &StatQueue{cfg: &engine.StatsConfig{ID: "FIRST", Weight: 30.0}}, - &StatQueue{cfg: &engine.StatsConfig{ID: "THIRD", Weight: 30.0}}, - } - if !reflect.DeepEqual(eSInst, sInsts) { - t.Errorf("expecting: %+v, received: %+v", eSInst, sInsts) - } -} diff --git a/stats/service.go b/stats/service.go deleted file mode 100755 index 5e4417a0a..000000000 --- a/stats/service.go +++ /dev/null @@ -1,290 +0,0 @@ -/* -Real-time Online/Offline Charging System (OCS) for Telecom & ISP environments -Copyright (C) ITsysCOM GmbH - -This program is free software: you can redistribute it and/or modify -it under the terms of the GNU General Public License as published by -the Free Software Foundation, either version 3 of the License, or -(at your option) any later version. - -This program is distributed in the hope that it will be useful, -but WITHOUT ANY WARRANTY; without even the implied warranty of -MERCHANTABILITY or FITNESS FOR A PARTICULAR PURPOSE. See the -GNU General Public License for more details. - -You should have received a copy of the GNU General Public License -along with this program. If not, see -*/ -package stats - -import ( - "errors" - "fmt" - "math/rand" - "reflect" - "strings" - "sync" - "time" - - "github.com/cgrates/cgrates/engine" - "github.com/cgrates/cgrates/utils" - "github.com/cgrates/rpcclient" -) - -func init() { - rand.Seed(time.Now().UnixNano()) -} - -// NewStatService initializes a StatService -func NewStatService(dataDB engine.DataDB, ms engine.Marshaler, storeInterval time.Duration) (ss *StatService, err error) { - ss = &StatService{dataDB: dataDB, ms: ms, storeInterval: storeInterval, - stopStoring: make(chan struct{}), evCache: NewStatsEventCache()} - sqPrfxs, err := dataDB.GetKeysForPrefix(utils.StatsConfigPrefix) - if err != nil { - return nil, err - } - ss.queuesCache = make(map[string]*StatQueue) - ss.queues = make(StatQueues, 0) - for _, prfx := range sqPrfxs { - if q, err := ss.loadQueue(prfx[len(utils.StatsConfigPrefix):]); err != nil { - utils.Logger.Err(fmt.Sprintf(" failed loading quueue with id: <%s>, err: <%s>", - q.cfg.ID, err.Error())) - continue - } else { - ss.setQueue(q) - } - } - ss.queues.Sort() - go ss.dumpStoredMetrics() // start dumpStoredMetrics loop - return -} - -// StatService builds stats for events -type StatService struct { - sync.RWMutex - dataDB engine.DataDB - ms engine.Marshaler - storeInterval time.Duration - stopStoring chan struct{} - evCache *StatsEventCache // so we can pass it to queues - queuesCache map[string]*StatQueue // unordered db of StatQueues, used for fast queries - queues StatQueues // ordered list of StatQueues - -} - -// ListenAndServe loops keeps the service alive -func (ss *StatService) ListenAndServe(exitChan chan bool) error { - e := <-exitChan - exitChan <- e // put back for the others listening for shutdown request - return nil -} - -// Called to shutdown the service -// ToDo: improve with context, ie following http implementation -func (ss *StatService) Shutdown() error { - utils.Logger.Info(" service shutdown initialized") - close(ss.stopStoring) - ss.storeMetrics() - utils.Logger.Info(" service shutdown complete") - return nil -} - -// setQueue adds or modifies a queue into cache -// sort will reorder the ss.queues -func (ss *StatService) loadQueue(qID string) (q *StatQueue, err error) { - sq, err := ss.dataDB.GetStatsConfig(qID) - if err != nil { - return nil, err - } - var sqSM *engine.SQStoredMetrics - if sq.Store { - if sqSM, err = ss.dataDB.GetSQStoredMetrics(sq.ID); err != nil && err != utils.ErrNotFound { - return nil, err - } - } - return NewStatQueue(ss.evCache, ss.ms, sq, sqSM) -} - -func (ss *StatService) setQueue(q *StatQueue) { - ss.queuesCache[q.cfg.ID] = q - ss.queues = append(ss.queues, q) -} - -// remQueue will remove a queue based on it's ID -func (ss *StatService) remQueue(qID string) (si *StatQueue) { - si = ss.queuesCache[qID] - ss.queues.remWithID(qID) - delete(ss.queuesCache, qID) - return -} - -// store stores the necessary storedMetrics to dataDB -func (ss *StatService) storeMetrics() { - for _, si := range ss.queues { - if !si.cfg.Store || !si.dirty { // no need to save - continue - } - if siSM := si.GetStoredMetrics(); siSM != nil { - if err := ss.dataDB.SetSQStoredMetrics(siSM); err != nil { - utils.Logger.Warning( - fmt.Sprintf(" failed saving StoredMetrics for QueueID: %s, error: %s", - si.cfg.ID, err.Error())) - } - } - // randomize the CPU load and give up thread control - time.Sleep(time.Duration(rand.Intn(1000)) * time.Nanosecond) - } - return -} - -// dumpStoredMetrics regularly dumps metrics to dataDB -func (ss *StatService) dumpStoredMetrics() { - for { - select { - case <-ss.stopStoring: - return - } - ss.storeMetrics() - time.Sleep(ss.storeInterval) - } -} - -// processEvent processes a StatsEvent through the queues and caches it when needed -func (ss *StatService) processEvent(ev engine.StatsEvent) (err error) { - evStatsID := ev.ID() - if evStatsID == "" { // ID is mandatory - return errors.New("missing ID field") - } - for _, stInst := range ss.queues { - if err := stInst.ProcessEvent(ev); err != nil { - utils.Logger.Warning( - fmt.Sprintf(" QueueID: %s, ignoring event with ID: %s, error: %s", - stInst.cfg.ID, evStatsID, err.Error())) - } - if stInst.cfg.Blocker { - break - } - } - return -} - -// V1ProcessEvent implements StatV1 method for processing an Event -func (ss *StatService) V1ProcessEvent(ev engine.StatsEvent, reply *string) (err error) { - if err = ss.processEvent(ev); err == nil { - *reply = utils.OK - } - return -} - -// V1GetQueueIDs returns list of queue IDs configured in the service -func (ss *StatService) V1GetQueueIDs(ignored struct{}, reply *[]string) (err error) { - if len(ss.queuesCache) == 0 { - return utils.ErrNotFound - } - for k := range ss.queuesCache { - *reply = append(*reply, k) - } - return -} - -// V1GetStringMetrics returns the metrics as string values -func (ss *StatService) V1GetStringMetrics(queueID string, reply *map[string]string) (err error) { - sq, has := ss.queuesCache[queueID] - if !has { - return utils.ErrNotFound - } - metrics := make(map[string]string, len(sq.sqMetrics)) - for metricID, metric := range sq.sqMetrics { - metrics[metricID] = metric.GetStringValue("") - } - *reply = metrics - return -} - -// V1GetFloatMetrics returns the metrics as float64 values -func (ss *StatService) V1GetFloatMetrics(queueID string, reply *map[string]float64) (err error) { - sq, has := ss.queuesCache[queueID] - if !has { - return utils.ErrNotFound - } - metrics := make(map[string]float64, len(sq.sqMetrics)) - for metricID, metric := range sq.sqMetrics { - metrics[metricID] = metric.GetFloat64Value() - } - *reply = metrics - return -} - -// ArgsLoadQueues are the arguments passed to V1LoadQueues -type ArgsLoadQueues struct { - QueueIDs *[]string -} - -// V1LoadQueues loads the queues specified by qIDs into the service -// loads all if args.QueueIDs is nil -func (ss *StatService) V1LoadQueues(args ArgsLoadQueues, reply *string) (err error) { - qIDs := args.QueueIDs - if qIDs == nil { - sqPrfxs, err := ss.dataDB.GetKeysForPrefix(utils.StatsConfigPrefix) - if err != nil { - return err - } - queueIDs := make([]string, len(sqPrfxs)) - for i, prfx := range sqPrfxs { - queueIDs[i] = prfx[len(utils.StatsConfigPrefix):] - } - if len(queueIDs) != 0 { - qIDs = &queueIDs - } - } - if qIDs == nil || len(*qIDs) == 0 { - return utils.ErrNotFound - } - var sQs []*StatQueue // cache here so we lock only later when data available - for _, qID := range *qIDs { - if _, hasPrev := ss.queuesCache[qID]; hasPrev { - continue // don't overwrite previous, could be extended in the future by carefully checking cached events - } - if q, err := ss.loadQueue(qID); err != nil { - utils.Logger.Err(fmt.Sprintf(" failed loading quueue with id: <%s>, err: <%s>", - q.cfg.ID, err.Error())) - continue - } else { - sQs = append(sQs, q) - } - } - ss.Lock() - for _, q := range sQs { - ss.setQueue(q) - } - ss.queues.Sort() - ss.Unlock() - *reply = utils.OK - return -} - -// Call implements rpcclient.RpcClientConnection interface for internal RPC -// here for testing purposes -func (ss *StatService) Call(serviceMethod string, args interface{}, reply interface{}) error { - methodSplit := strings.Split(serviceMethod, ".") - if len(methodSplit) != 2 { - return rpcclient.ErrUnsupporteServiceMethod - } - method := reflect.ValueOf(ss).MethodByName(methodSplit[0][len(methodSplit[0])-2:] + methodSplit[1]) - if !method.IsValid() { - return rpcclient.ErrUnsupporteServiceMethod - } - params := []reflect.Value{reflect.ValueOf(args), reflect.ValueOf(reply)} - ret := method.Call(params) - if len(ret) != 1 { - return utils.ErrServerError - } - if ret[0].Interface() == nil { - return nil - } - err, ok := ret[0].Interface().(error) - if !ok { - return utils.ErrServerError - } - return err -} diff --git a/stats/service_test.go b/stats/service_test.go deleted file mode 100644 index b0b580017..000000000 --- a/stats/service_test.go +++ /dev/null @@ -1,77 +0,0 @@ -/* -Real-time Online/Offline Charging System (OCS) for Telecom & ISP environments -Copyright (C) ITsysCOM GmbH - -This program is free software: you can redistribute it and/or modify -it under the terms of the GNU General Public License as published by -the Free Software Foundation, either version 3 of the License, or -(at your option) any later version. - -This program is distributed in the hope that it will be useful, -but WITHOUT ANY WARRANTY; without even the implied warranty of -MERCHANTABILITY or FITNESS FOR A PARTICULAR PURPOSE. See the -GNU General Public License for more details. - -You should have received a copy of the GNU General Public License -along with this program. If not, see -*/ -package stats - -import ( - "testing" - "time" - - "github.com/cgrates/cgrates/config" - "github.com/cgrates/cgrates/engine" - "github.com/cgrates/cgrates/utils" -) - -func TestReqFilterPassStatS(t *testing.T) { - if cgrCfg := config.CgrConfig(); cgrCfg == nil { - cgrCfg, _ = config.NewDefaultCGRConfig() - config.SetCgrConfig(cgrCfg) - } - dataStorage, _ := engine.NewMapStorage() - dataStorage.SetStatsConfig( - &engine.StatsConfig{ID: "CDRST1", - Filters: []*engine.RequestFilter{ - &engine.RequestFilter{Type: engine.MetaString, FieldName: "Tenant", - Values: []string{"cgrates.org"}}}, - Metrics: []string{utils.MetaASR}}) - statS, err := NewStatService(dataStorage, dataStorage.Marshaler(), 0) - if err != nil { - t.Fatal(err) - } - var replyStr string - if err := statS.Call("StatSV1.LoadQueues", ArgsLoadQueues{}, - &replyStr); err != nil { - t.Error(err) - } else if replyStr != utils.OK { - t.Errorf("reply received: %s", replyStr) - } - cdr := &engine.CDR{ - Tenant: "cgrates.org", - Category: "call", - AnswerTime: time.Now(), - SetupTime: time.Now(), - Usage: 10 * time.Second, - Cost: 10, - Supplier: "suppl1", - DisconnectCause: "NORMAL_CLEARNING", - } - cdrMp, _ := cdr.AsMapStringIface() - cdrMp[utils.ID] = "event1" - if err := statS.processEvent(cdrMp); err != nil { - t.Error(err) - } - rf, err := engine.NewRequestFilter(engine.MetaStatS, "", - []string{"CDRST1:*min_asr:20"}) - if err != nil { - t.Fatal(err) - } - if passes, err := rf.Pass(cdr, "", statS); err != nil { - t.Error(err) - } else if !passes { - t.Error("Not passing") - } -} diff --git a/utils/consts.go b/utils/consts.go index 8c9b510a0..492c3d9c8 100755 --- a/utils/consts.go +++ b/utils/consts.go @@ -255,9 +255,9 @@ const ( LOG_ERR = "ler_" LOG_CDR = "cdr_" LOG_MEDIATED_CDR = "mcd_" - SQStoredMetricsPrefix = "ssm_" StatsConfigPrefix = "scf_" ThresholdCfgPrefix = "thc_" + StatQueuePrefix = "stq_" LOADINST_KEY = "load_history" SESSION_MANAGER_SOURCE = "SMR" MEDIATOR_SOURCE = "MED"