diff --git a/engine/datamanager.go b/engine/datamanager.go new file mode 100644 index 000000000..e2dcad41b --- /dev/null +++ b/engine/datamanager.go @@ -0,0 +1,76 @@ +/* +Real-time Online/Offline Charging System (OCS) for Telecom & ISP environments +Copyright (C) ITsysCOM GmbH +This program is free software: you can redistribute it and/or modify +it under the terms of the GNU General Public License as published by +the Free Software Foundation, either version 3 of the License, or +(at your option) any later version. +This program is distributed in the hope that it will be useful, +but WITHOUT ANY WARRANTY; without even the implied warranty of +MERCHANTABILITY or FITNESS FOR A PARTICULAR PURPOSE. See the +GNU General Public License for more details. +You should have received a copy of the GNU General Public License +along with this program. If not, see +*/ + +package engine + +import ( + "github.com/cgrates/cgrates/cache" + "github.com/cgrates/cgrates/utils" +) + +func NewDataManager(dataDB DataDB, ms Marshaler) *DataManager { + return &DataManager{dataDB: dataDB, ms: ms} +} + +// DataManager is the data storage manager for CGRateS +// transparently manages data retrieval, further serialization and caching +type DataManager struct { + dataDB DataDB + ms Marshaler +} + +// GetStatQueue retrieves a StatQueue from dataDB +// handles caching and deserialization of metrics +func (dm *DataManager) GetStatQueue(tenant, id string, skipCache bool, transactionID string) (sq *StatQueue, err error) { + key := utils.StatQueuePrefix + utils.ConcatenatedKey(tenant, id) + if !skipCache { + if x, ok := cache.Get(key); ok { + if x == nil { + return nil, utils.ErrNotFound + } + return x.(*StatQueue), nil + } + } + ssq, err := dm.dataDB.GetStoredStatQueue(tenant, id) + if err != nil { + if err == utils.ErrNotFound { + cache.Set(key, nil, cacheCommit(transactionID), transactionID) + } + return nil, err + } + if sq, err = ssq.AsStatQueue(dm.ms); err != nil { + return nil, err + } + cache.Set(key, sq, cacheCommit(transactionID), transactionID) + return +} + +// SetStatQueue converts to StoredStatQueue and stores the result in dataDB +func (dm *DataManager) SetStatQueue(sq *StatQueue) (err error) { + ssq, err := NewStoredStatQueue(sq, dm.ms) + if err != nil { + return err + } + return dm.dataDB.SetStoredStatQueue(ssq) +} + +// RemStatQueue removes the StoredStatQueue and clears the cache for StatQueue +func (dm *DataManager) RemStatQueue(tenant, id string, transactionID string) (err error) { + if err = dm.dataDB.RemStoredStatQueue(tenant, id); err != nil { + return + } + cache.RemKey(utils.StatQueuePrefix+utils.ConcatenatedKey(tenant, id), cacheCommit(transactionID), transactionID) + return +} diff --git a/engine/datamanagers.go b/engine/datamanagers.go deleted file mode 100644 index 7c3a050d0..000000000 --- a/engine/datamanagers.go +++ /dev/null @@ -1,28 +0,0 @@ -/* -Real-time Online/Offline Charging System (OCS) for Telecom & ISP environments -Copyright (C) ITsysCOM GmbH - -This program is free software: you can redistribute it and/or modify -it under the terms of the GNU General Public License as published by -the Free Software Foundation, either version 3 of the License, or -(at your option) any later version. - -This program is distributed in the hope that it will be useful, -but WITHOUT ANY WARRANTY; without even the implied warranty of -MERCHANTABILITY or FITNESS FOR A PARTICULAR PURPOSE. See the -GNU General Public License for more details. - -You should have received a copy of the GNU General Public License -along with this program. If not, see -*/ -package engine - -import () - -type TPDataManager struct { - tpDB DataDB -} - -func (tpdm *TPDataManager) HasData(category, subject string) (bool, error) { - return tpdm.tpDB.HasData(category, subject) -} diff --git a/engine/libstats.go b/engine/libstats.go index 4bd6a0bb3..c445d7081 100755 --- a/engine/libstats.go +++ b/engine/libstats.go @@ -64,6 +64,72 @@ func (se StatEvent) AnswerTime(timezone string) (at time.Time, err error) { return utils.ParseTimeDetectLayout(atStr, timezone) } +// NewStoredStatQueue initiates a StoredStatQueue out of StatQueue +func NewStoredStatQueue(sq *StatQueue, ms Marshaler) (sSQ *StoredStatQueue, err error) { + sSQ = &StoredStatQueue{ + Tenant: sq.Tenant, + ID: sq.ID, + SQItems: make([]struct { + EventID string + ExpiryTime *time.Time + }, len(sq.SQItems)), + SQMetrics: make(map[string][]byte, len(sq.SQMetrics)), + } + for i, sqItm := range sq.SQItems { + sSQ.SQItems[i] = sqItm + } + for metricID, metric := range sq.SQMetrics { + if marshaled, err := metric.Marshal(ms); err != nil { + return nil, err + } else { + sSQ.SQMetrics[metricID] = marshaled + } + } + return +} + +// StoredStatQueue differs from StatQueue due to serialization of SQMetrics +type StoredStatQueue struct { + Tenant string + ID string + SQItems []struct { + EventID string // Bounded to the original StatEvent + ExpiryTime *time.Time // Used to auto-expire events + } + SQMetrics map[string][]byte +} + +// SqID will compose the unique identifier for the StatQueue out of Tenant and ID +func (ssq *StoredStatQueue) SqID() string { + return utils.ConcatenatedKey(ssq.Tenant, ssq.ID) +} + +// AsStatQueue converts into StatQueue unmarshaling SQMetrics +func (ssq *StoredStatQueue) AsStatQueue(ms Marshaler) (sq *StatQueue, err error) { + sq = &StatQueue{ + Tenant: ssq.Tenant, + ID: ssq.ID, + SQItems: make([]struct { + EventID string + ExpiryTime *time.Time + }, len(ssq.SQItems)), + SQMetrics: make(map[string]StatMetric, len(ssq.SQMetrics)), + } + for i, sqItm := range ssq.SQItems { + sq.SQItems[i] = sqItm + } + for metricID, marshaled := range ssq.SQMetrics { + if metric, err := NewStatMetric(metricID); err != nil { + return nil, err + } else if err := metric.LoadFromMarshaled(ms, marshaled); err != nil { + return nil, err + } else { + sq.SQMetrics[metricID] = metric + } + } + return +} + // StatQueue represents an individual stats instance type StatQueue struct { Tenant string @@ -77,6 +143,11 @@ type StatQueue struct { dirty *bool // needs save } +// SqID will compose the unique identifier for the StatQueue out of Tenant and ID +func (sq *StatQueue) SqID() string { + return utils.ConcatenatedKey(sq.Tenant, sq.ID) +} + /* // GetSQStoredMetrics retrieves the data used for store to DB func (sq *StatQueue) GetStoredMetrics() (sqSM *engine.SQStoredMetrics) { diff --git a/engine/onstor_it_test.go b/engine/onstor_it_test.go index 1faa13d19..f5b4709a1 100644 --- a/engine/onstor_it_test.go +++ b/engine/onstor_it_test.go @@ -89,8 +89,8 @@ var sTestsOnStorIT = []func(t *testing.T){ testOnStorITCRUDTiming, testOnStorITCRUDHistory, testOnStorITCRUDStructVersion, - testOnStorITCRUDSQStoredMetrics, testOnStorITCRUDStatQueueProfile, + testOnStorITCRUDStoredStatQueue, testOnStorITCRUDThresholdCfg, } @@ -1947,32 +1947,6 @@ func testOnStorITCRUDStructVersion(t *testing.T) { } } -func testOnStorITCRUDSQStoredMetrics(t *testing.T) { - sqm := &SQStoredMetrics{ - SqID: "test", - SEvents: map[string]StatsEvent{}, - SQItems: []*SQItem{}, - SQMetrics: map[string][]byte{}, - } - if _, rcvErr := onStor.GetSQStoredMetrics(sqm.SqID); rcvErr != utils.ErrNotFound { - t.Error(rcvErr) - } - if err := onStor.SetSQStoredMetrics(sqm); err != nil { - t.Error(err) - } - if rcv, err := onStor.GetSQStoredMetrics(sqm.SqID); err != nil { - t.Error(err) - } else if !reflect.DeepEqual(sqm, rcv) { - t.Errorf("Expecting: %v, received: %v", sqm, rcv) - } - if err := onStor.RemSQStoredMetrics(sqm.SqID); err != nil { - t.Error(err) - } - if _, rcvErr := onStor.GetSQStoredMetrics(sqm.SqID); rcvErr != utils.ErrNotFound { - t.Error(rcvErr) - } -} - func testOnStorITCRUDStatQueueProfile(t *testing.T) { timeTTL := time.Duration(0 * time.Second) sq := &StatQueueProfile{ @@ -2016,6 +1990,50 @@ func testOnStorITCRUDStatQueueProfile(t *testing.T) { } } +func testOnStorITCRUDStoredStatQueue(t *testing.T) { + eTime := utils.TimePointer(time.Date(2013, 10, 1, 0, 0, 0, 0, time.UTC).Local()) + asr := &StatASR{ + Answered: 2, + Count: 3, + Events: map[string]bool{ + "cgrates.org:ev1": true, + "cgrates.org:ev2": true, + "cgrates.org:ev3": false, + }, + } + msrshled, err := asr.Marshal(onStor.Marshaler()) + if err != nil { + t.Error(err) + } + sq := &StoredStatQueue{ + Tenant: "cgrates.org", + ID: "testOnStorITCRUDStatQueue", + SQItems: []struct { + EventID string // Bounded to the original StatEvent + ExpiryTime *time.Time // Used to auto-expire events + }{{EventID: "cgrates.org:ev1", ExpiryTime: eTime}, + {EventID: "cgrates.org:ev2", ExpiryTime: eTime}, + {EventID: "cgrates.org:ev3", ExpiryTime: eTime}}, + SQMetrics: map[string][]byte{ + utils.MetaASR: msrshled, + }, + } + if err := onStor.SetStoredStatQueue(sq); err != nil { + t.Error(err) + } + if rcv, err := onStor.GetStoredStatQueue(sq.Tenant, sq.ID); err != nil { + t.Error(err) + } else if !reflect.DeepEqual(sq, rcv) { + t.Errorf("Expecting: %v, received: %v", sq, rcv) + } + if err := onStor.RemStoredStatQueue(sq.Tenant, sq.ID); err != nil { + t.Error(err) + } + if _, rcvErr := onStor.GetStoredStatQueue(sq.Tenant, sq.ID); rcvErr != utils.ErrNotFound { + t.Error(rcvErr) + } +} + func testOnStorITCRUDThresholdCfg(t *testing.T) { timeMinSleep := time.Duration(0 * time.Second) th := &ThresholdCfg{ diff --git a/engine/statmetrics.go b/engine/statmetrics.go index 7fc970dc2..2122a6422 100644 --- a/engine/statmetrics.go +++ b/engine/statmetrics.go @@ -46,14 +46,16 @@ type StatMetric interface { GetFloat64Value() (val float64) AddEvent(ev StatEvent) error RemEvent(evID string) error + Marshal(ms Marshaler) (marshaled []byte, err error) + LoadFromMarshaled(ms Marshaler, marshaled []byte) (err error) } func NewASR() (StatMetric, error) { - return new(ASRStat), nil + return new(StatASR), nil } // ASR implements AverageSuccessRatio metric -type ASRStat struct { +type StatASR struct { Answered float64 Count float64 Events map[string]bool // map[EventID]Answered @@ -61,7 +63,7 @@ type ASRStat struct { } // getValue returns asr.val -func (asr *ASRStat) getValue() float64 { +func (asr *StatASR) getValue() float64 { if asr.val == nil { if asr.Count == 0 { asr.val = utils.Float64Pointer(float64(STATS_NA)) @@ -74,11 +76,11 @@ func (asr *ASRStat) getValue() float64 { } // GetValue returns the ASR value as part of StatMetric interface -func (asr *ASRStat) GetValue() (v interface{}) { +func (asr *StatASR) GetValue() (v interface{}) { return asr.getValue() } -func (asr *ASRStat) GetStringValue(fmtOpts string) (valStr string) { +func (asr *StatASR) GetStringValue(fmtOpts string) (valStr string) { if asr.Count == 0 { return utils.NOT_AVAILABLE } @@ -86,12 +88,12 @@ func (asr *ASRStat) GetStringValue(fmtOpts string) (valStr string) { } // GetFloat64Value is part of StatMetric interface -func (asr *ASRStat) GetFloat64Value() (val float64) { +func (asr *StatASR) GetFloat64Value() (val float64) { return asr.getValue() } // AddEvent is part of StatMetric interface -func (asr *ASRStat) AddEvent(ev StatEvent) (err error) { +func (asr *StatASR) AddEvent(ev StatEvent) (err error) { var answered bool if at, err := ev.AnswerTime(config.CgrConfig().DefaultTimezone); err != nil && err != utils.ErrNotFound { @@ -107,7 +109,7 @@ func (asr *ASRStat) AddEvent(ev StatEvent) (err error) { return } -func (asr *ASRStat) RemEvent(evID string) (err error) { +func (asr *StatASR) RemEvent(evID string) (err error) { answered, has := asr.Events[evID] if !has { return utils.ErrNotFound @@ -120,32 +122,46 @@ func (asr *ASRStat) RemEvent(evID string) (err error) { return } +func (asr *StatASR) Marshal(ms Marshaler) (marshaled []byte, err error) { + return ms.Marshal(asr) +} +func (asr *StatASR) LoadFromMarshaled(ms Marshaler, marshaled []byte) (err error) { + return +} + func NewACD() (StatMetric, error) { - return new(ACDStat), nil + return new(StatACD), nil } // ACD implements AverageCallDuration metric -type ACDStat struct { +type StatACD struct { Sum time.Duration Count int } -func (acd *ACDStat) GetStringValue(fmtOpts string) (val string) { +func (acd *StatACD) GetStringValue(fmtOpts string) (val string) { return } -func (acd *ACDStat) GetValue() (v interface{}) { +func (acd *StatACD) GetValue() (v interface{}) { return } -func (acd *ACDStat) GetFloat64Value() (v float64) { +func (acd *StatACD) GetFloat64Value() (v float64) { return float64(STATS_NA) } -func (acd *ACDStat) AddEvent(ev StatEvent) (err error) { +func (acd *StatACD) AddEvent(ev StatEvent) (err error) { return } -func (acd *ACDStat) RemEvent(evID string) (err error) { +func (acd *StatACD) RemEvent(evID string) (err error) { + return +} + +func (acd *StatACD) Marshal(ms Marshaler) (marshaled []byte, err error) { + return +} +func (acd *StatACD) LoadFromMarshaled(ms Marshaler, marshaled []byte) (err error) { return } diff --git a/engine/storage_interface.go b/engine/storage_interface.go index d7e8230e1..1f127bc40 100755 --- a/engine/storage_interface.go +++ b/engine/storage_interface.go @@ -115,6 +115,9 @@ type DataDB interface { GetStatQueueProfile(sqID string) (sq *StatQueueProfile, err error) SetStatQueueProfile(sq *StatQueueProfile) (err error) RemStatQueueProfile(sqID string) (err error) + GetStoredStatQueue(tenant, id string) (sq *StoredStatQueue, err error) + SetStoredStatQueue(sq *StoredStatQueue) (err error) + RemStoredStatQueue(tenant, id string) (err error) GetThresholdCfg(ID string, skipCache bool, transactionID string) (th *ThresholdCfg, err error) SetThresholdCfg(th *ThresholdCfg) (err error) RemThresholdCfg(ID string, transactionID string) (err error) diff --git a/engine/storage_map.go b/engine/storage_map.go index 37e672144..f0499454a 100755 --- a/engine/storage_map.go +++ b/engine/storage_map.go @@ -1514,10 +1514,10 @@ func (ms *MapStorage) RemStatQueueProfile(scfID string) (err error) { } // GetStatQueue retrieves the stored metrics for a StatsQueue -func (ms *MapStorage) GetStatQueue(sqID string) (sq *StatQueue, err error) { +func (ms *MapStorage) GetStoredStatQueue(tenant, id string) (sq *StoredStatQueue, err error) { ms.mu.RLock() defer ms.mu.RUnlock() - values, ok := ms.dict[utils.StatQueuePrefix+sqID] + values, ok := ms.dict[utils.StatQueuePrefix+utils.ConcatenatedKey(tenant, id)] if !ok { return nil, utils.ErrNotFound } @@ -1526,7 +1526,7 @@ func (ms *MapStorage) GetStatQueue(sqID string) (sq *StatQueue, err error) { } // SetStatQueue stores the metrics for a StatsQueue -func (ms *MapStorage) SetStatQueue(sq *StatQueue) (err error) { +func (ms *MapStorage) SetStoredStatQueue(sq *StoredStatQueue) (err error) { ms.mu.Lock() defer ms.mu.Unlock() var result []byte @@ -1534,15 +1534,15 @@ func (ms *MapStorage) SetStatQueue(sq *StatQueue) (err error) { if err != nil { return err } - ms.dict[utils.StatQueuePrefix+sq.ID] = result + ms.dict[utils.StatQueuePrefix+sq.SqID()] = result return } // RemStatQueue removes a StatsQueue -func (ms *MapStorage) RemStatQueue(sqID string) (err error) { +func (ms *MapStorage) RemStoredStatQueue(tenant, id string) (err error) { ms.mu.Lock() defer ms.mu.Unlock() - delete(ms.dict, utils.StatQueuePrefix+sqID) + delete(ms.dict, utils.StatQueuePrefix+utils.ConcatenatedKey(tenant, id)) return } diff --git a/engine/storage_mongo_datadb.go b/engine/storage_mongo_datadb.go index e530d7194..7d84a655d 100755 --- a/engine/storage_mongo_datadb.go +++ b/engine/storage_mongo_datadb.go @@ -2046,11 +2046,11 @@ func (ms *MongoStorage) RemStatQueueProfile(sqID string) (err error) { return } -// GetStatQueue retrieves a StatsQueue -func (ms *MongoStorage) GetStatQueue(sqID string) (sq *StatQueue, err error) { +// GetStoredStatQueue retrieves a StoredStatQueue +func (ms *MongoStorage) GetStoredStatQueue(tenant, id string) (sq *StoredStatQueue, err error) { session, col := ms.conn(colStQs) defer session.Close() - if err = col.Find(bson.M{"id": sqID}).One(&sq); err != nil { + if err = col.Find(bson.M{"tenant": tenant, "id": id}).One(&sq); err != nil { if err == mgo.ErrNotFound { err = utils.ErrNotFound } @@ -2059,19 +2059,22 @@ func (ms *MongoStorage) GetStatQueue(sqID string) (sq *StatQueue, err error) { return } -// SetStoredSQ stores the metrics for a StatsQueue -func (ms *MongoStorage) SetStatQueue(sq *StatQueue) (err error) { +// SetStoredStatQueue stores the metrics for a StoredStatQueue +func (ms *MongoStorage) SetStoredStatQueue(sq *StoredStatQueue) (err error) { session, col := ms.conn(colStQs) defer session.Close() - _, err = col.Upsert(bson.M{"id": sq.ID}, sq) + _, err = col.Upsert(bson.M{"tenant": sq.Tenant, "id": sq.ID}, sq) return } -// RemStatQueue removes stored metrics for a StatsQueue -func (ms *MongoStorage) RemStatQueue(sqmID string) (err error) { +// RemStatQueue removes stored metrics for a StoredStatQueue +func (ms *MongoStorage) RemStoredStatQueue(tenant, id string) (err error) { session, col := ms.conn(colStQs) defer session.Close() - err = col.Remove(bson.M{"sqid": sqmID}) + err = col.Remove(bson.M{"tenant": tenant, "id": id}) + if err == mgo.ErrNotFound { + err = utils.ErrNotFound + } return err } diff --git a/engine/storage_redis.go b/engine/storage_redis.go index 5febb1000..6b295ca7d 100755 --- a/engine/storage_redis.go +++ b/engine/storage_redis.go @@ -1620,9 +1620,9 @@ func (rs *RedisStorage) RemStatQueueProfile(sqID string) (err error) { return } -// GetStatQueue retrieves the stored metrics for a StatsQueue -func (rs *RedisStorage) GetStatQueue(sqID string) (sq *StatQueue, err error) { - key := utils.StatQueuePrefix + sqID +// GetStoredStatQueue retrieves the stored metrics for a StatsQueue +func (rs *RedisStorage) GetStoredStatQueue(tenant, id string) (sq *StoredStatQueue, err error) { + key := utils.StatQueuePrefix + utils.ConcatenatedKey(tenant, id) var values []byte if values, err = rs.Cmd("GET", key).Bytes(); err != nil { if err == redis.ErrRespNil { @@ -1636,19 +1636,19 @@ func (rs *RedisStorage) GetStatQueue(sqID string) (sq *StatQueue, err error) { return } -// SetStatQueue stores the metrics for a StatsQueue -func (rs *RedisStorage) SetStatQueue(sq *StatQueue) (err error) { +// SetStoredStatQueue stores the metrics for a StatsQueue +func (rs *RedisStorage) SetStoredStatQueue(sq *StoredStatQueue) (err error) { var result []byte result, err = rs.ms.Marshal(sq) if err != nil { return } - return rs.Cmd("SET", utils.StatQueuePrefix+sq.ID, result).Err + return rs.Cmd("SET", utils.StatQueuePrefix+sq.SqID(), result).Err } // RemStatQueue removes a StatsQueue -func (rs *RedisStorage) RemStatQueue(sqID string) (err error) { - key := utils.StatQueuePrefix + sqID +func (rs *RedisStorage) RemStoredStatQueue(tenant, id string) (err error) { + key := utils.StatQueuePrefix + utils.ConcatenatedKey(tenant, id) if err = rs.Cmd("DEL", key).Err; err != nil { return }