StoredStatQueue, DataManager as data storage manager

This commit is contained in:
DanB
2017-09-10 19:22:18 +02:00
parent 6391938eb6
commit 05ecfbdf0d
9 changed files with 252 additions and 93 deletions

76
engine/datamanager.go Normal file
View File

@@ -0,0 +1,76 @@
/*
Real-time Online/Offline Charging System (OCS) for Telecom & ISP environments
Copyright (C) ITsysCOM GmbH
This program is free software: you can redistribute it and/or modify
it under the terms of the GNU General Public License as published by
the Free Software Foundation, either version 3 of the License, or
(at your option) any later version.
This program is distributed in the hope that it will be useful,
but WITHOUT ANY WARRANTY; without even the implied warranty of
MERCHANTABILITY or FITNESS FOR A PARTICULAR PURPOSE. See the
GNU General Public License for more details.
You should have received a copy of the GNU General Public License
along with this program. If not, see <http://www.gnu.org/licenses/>
*/
package engine
import (
"github.com/cgrates/cgrates/cache"
"github.com/cgrates/cgrates/utils"
)
func NewDataManager(dataDB DataDB, ms Marshaler) *DataManager {
return &DataManager{dataDB: dataDB, ms: ms}
}
// DataManager is the data storage manager for CGRateS
// transparently manages data retrieval, further serialization and caching
type DataManager struct {
dataDB DataDB
ms Marshaler
}
// GetStatQueue retrieves a StatQueue from dataDB
// handles caching and deserialization of metrics
func (dm *DataManager) GetStatQueue(tenant, id string, skipCache bool, transactionID string) (sq *StatQueue, err error) {
key := utils.StatQueuePrefix + utils.ConcatenatedKey(tenant, id)
if !skipCache {
if x, ok := cache.Get(key); ok {
if x == nil {
return nil, utils.ErrNotFound
}
return x.(*StatQueue), nil
}
}
ssq, err := dm.dataDB.GetStoredStatQueue(tenant, id)
if err != nil {
if err == utils.ErrNotFound {
cache.Set(key, nil, cacheCommit(transactionID), transactionID)
}
return nil, err
}
if sq, err = ssq.AsStatQueue(dm.ms); err != nil {
return nil, err
}
cache.Set(key, sq, cacheCommit(transactionID), transactionID)
return
}
// SetStatQueue converts to StoredStatQueue and stores the result in dataDB
func (dm *DataManager) SetStatQueue(sq *StatQueue) (err error) {
ssq, err := NewStoredStatQueue(sq, dm.ms)
if err != nil {
return err
}
return dm.dataDB.SetStoredStatQueue(ssq)
}
// RemStatQueue removes the StoredStatQueue and clears the cache for StatQueue
func (dm *DataManager) RemStatQueue(tenant, id string, transactionID string) (err error) {
if err = dm.dataDB.RemStoredStatQueue(tenant, id); err != nil {
return
}
cache.RemKey(utils.StatQueuePrefix+utils.ConcatenatedKey(tenant, id), cacheCommit(transactionID), transactionID)
return
}

View File

@@ -1,28 +0,0 @@
/*
Real-time Online/Offline Charging System (OCS) for Telecom & ISP environments
Copyright (C) ITsysCOM GmbH
This program is free software: you can redistribute it and/or modify
it under the terms of the GNU General Public License as published by
the Free Software Foundation, either version 3 of the License, or
(at your option) any later version.
This program is distributed in the hope that it will be useful,
but WITHOUT ANY WARRANTY; without even the implied warranty of
MERCHANTABILITY or FITNESS FOR A PARTICULAR PURPOSE. See the
GNU General Public License for more details.
You should have received a copy of the GNU General Public License
along with this program. If not, see <http://www.gnu.org/licenses/>
*/
package engine
import ()
type TPDataManager struct {
tpDB DataDB
}
func (tpdm *TPDataManager) HasData(category, subject string) (bool, error) {
return tpdm.tpDB.HasData(category, subject)
}

View File

@@ -64,6 +64,72 @@ func (se StatEvent) AnswerTime(timezone string) (at time.Time, err error) {
return utils.ParseTimeDetectLayout(atStr, timezone)
}
// NewStoredStatQueue initiates a StoredStatQueue out of StatQueue
func NewStoredStatQueue(sq *StatQueue, ms Marshaler) (sSQ *StoredStatQueue, err error) {
sSQ = &StoredStatQueue{
Tenant: sq.Tenant,
ID: sq.ID,
SQItems: make([]struct {
EventID string
ExpiryTime *time.Time
}, len(sq.SQItems)),
SQMetrics: make(map[string][]byte, len(sq.SQMetrics)),
}
for i, sqItm := range sq.SQItems {
sSQ.SQItems[i] = sqItm
}
for metricID, metric := range sq.SQMetrics {
if marshaled, err := metric.Marshal(ms); err != nil {
return nil, err
} else {
sSQ.SQMetrics[metricID] = marshaled
}
}
return
}
// StoredStatQueue differs from StatQueue due to serialization of SQMetrics
type StoredStatQueue struct {
Tenant string
ID string
SQItems []struct {
EventID string // Bounded to the original StatEvent
ExpiryTime *time.Time // Used to auto-expire events
}
SQMetrics map[string][]byte
}
// SqID will compose the unique identifier for the StatQueue out of Tenant and ID
func (ssq *StoredStatQueue) SqID() string {
return utils.ConcatenatedKey(ssq.Tenant, ssq.ID)
}
// AsStatQueue converts into StatQueue unmarshaling SQMetrics
func (ssq *StoredStatQueue) AsStatQueue(ms Marshaler) (sq *StatQueue, err error) {
sq = &StatQueue{
Tenant: ssq.Tenant,
ID: ssq.ID,
SQItems: make([]struct {
EventID string
ExpiryTime *time.Time
}, len(ssq.SQItems)),
SQMetrics: make(map[string]StatMetric, len(ssq.SQMetrics)),
}
for i, sqItm := range ssq.SQItems {
sq.SQItems[i] = sqItm
}
for metricID, marshaled := range ssq.SQMetrics {
if metric, err := NewStatMetric(metricID); err != nil {
return nil, err
} else if err := metric.LoadFromMarshaled(ms, marshaled); err != nil {
return nil, err
} else {
sq.SQMetrics[metricID] = metric
}
}
return
}
// StatQueue represents an individual stats instance
type StatQueue struct {
Tenant string
@@ -77,6 +143,11 @@ type StatQueue struct {
dirty *bool // needs save
}
// SqID will compose the unique identifier for the StatQueue out of Tenant and ID
func (sq *StatQueue) SqID() string {
return utils.ConcatenatedKey(sq.Tenant, sq.ID)
}
/*
// GetSQStoredMetrics retrieves the data used for store to DB
func (sq *StatQueue) GetStoredMetrics() (sqSM *engine.SQStoredMetrics) {

View File

@@ -89,8 +89,8 @@ var sTestsOnStorIT = []func(t *testing.T){
testOnStorITCRUDTiming,
testOnStorITCRUDHistory,
testOnStorITCRUDStructVersion,
testOnStorITCRUDSQStoredMetrics,
testOnStorITCRUDStatQueueProfile,
testOnStorITCRUDStoredStatQueue,
testOnStorITCRUDThresholdCfg,
}
@@ -1947,32 +1947,6 @@ func testOnStorITCRUDStructVersion(t *testing.T) {
}
}
func testOnStorITCRUDSQStoredMetrics(t *testing.T) {
sqm := &SQStoredMetrics{
SqID: "test",
SEvents: map[string]StatsEvent{},
SQItems: []*SQItem{},
SQMetrics: map[string][]byte{},
}
if _, rcvErr := onStor.GetSQStoredMetrics(sqm.SqID); rcvErr != utils.ErrNotFound {
t.Error(rcvErr)
}
if err := onStor.SetSQStoredMetrics(sqm); err != nil {
t.Error(err)
}
if rcv, err := onStor.GetSQStoredMetrics(sqm.SqID); err != nil {
t.Error(err)
} else if !reflect.DeepEqual(sqm, rcv) {
t.Errorf("Expecting: %v, received: %v", sqm, rcv)
}
if err := onStor.RemSQStoredMetrics(sqm.SqID); err != nil {
t.Error(err)
}
if _, rcvErr := onStor.GetSQStoredMetrics(sqm.SqID); rcvErr != utils.ErrNotFound {
t.Error(rcvErr)
}
}
func testOnStorITCRUDStatQueueProfile(t *testing.T) {
timeTTL := time.Duration(0 * time.Second)
sq := &StatQueueProfile{
@@ -2016,6 +1990,50 @@ func testOnStorITCRUDStatQueueProfile(t *testing.T) {
}
}
func testOnStorITCRUDStoredStatQueue(t *testing.T) {
eTime := utils.TimePointer(time.Date(2013, 10, 1, 0, 0, 0, 0, time.UTC).Local())
asr := &StatASR{
Answered: 2,
Count: 3,
Events: map[string]bool{
"cgrates.org:ev1": true,
"cgrates.org:ev2": true,
"cgrates.org:ev3": false,
},
}
msrshled, err := asr.Marshal(onStor.Marshaler())
if err != nil {
t.Error(err)
}
sq := &StoredStatQueue{
Tenant: "cgrates.org",
ID: "testOnStorITCRUDStatQueue",
SQItems: []struct {
EventID string // Bounded to the original StatEvent
ExpiryTime *time.Time // Used to auto-expire events
}{{EventID: "cgrates.org:ev1", ExpiryTime: eTime},
{EventID: "cgrates.org:ev2", ExpiryTime: eTime},
{EventID: "cgrates.org:ev3", ExpiryTime: eTime}},
SQMetrics: map[string][]byte{
utils.MetaASR: msrshled,
},
}
if err := onStor.SetStoredStatQueue(sq); err != nil {
t.Error(err)
}
if rcv, err := onStor.GetStoredStatQueue(sq.Tenant, sq.ID); err != nil {
t.Error(err)
} else if !reflect.DeepEqual(sq, rcv) {
t.Errorf("Expecting: %v, received: %v", sq, rcv)
}
if err := onStor.RemStoredStatQueue(sq.Tenant, sq.ID); err != nil {
t.Error(err)
}
if _, rcvErr := onStor.GetStoredStatQueue(sq.Tenant, sq.ID); rcvErr != utils.ErrNotFound {
t.Error(rcvErr)
}
}
func testOnStorITCRUDThresholdCfg(t *testing.T) {
timeMinSleep := time.Duration(0 * time.Second)
th := &ThresholdCfg{

View File

@@ -46,14 +46,16 @@ type StatMetric interface {
GetFloat64Value() (val float64)
AddEvent(ev StatEvent) error
RemEvent(evID string) error
Marshal(ms Marshaler) (marshaled []byte, err error)
LoadFromMarshaled(ms Marshaler, marshaled []byte) (err error)
}
func NewASR() (StatMetric, error) {
return new(ASRStat), nil
return new(StatASR), nil
}
// ASR implements AverageSuccessRatio metric
type ASRStat struct {
type StatASR struct {
Answered float64
Count float64
Events map[string]bool // map[EventID]Answered
@@ -61,7 +63,7 @@ type ASRStat struct {
}
// getValue returns asr.val
func (asr *ASRStat) getValue() float64 {
func (asr *StatASR) getValue() float64 {
if asr.val == nil {
if asr.Count == 0 {
asr.val = utils.Float64Pointer(float64(STATS_NA))
@@ -74,11 +76,11 @@ func (asr *ASRStat) getValue() float64 {
}
// GetValue returns the ASR value as part of StatMetric interface
func (asr *ASRStat) GetValue() (v interface{}) {
func (asr *StatASR) GetValue() (v interface{}) {
return asr.getValue()
}
func (asr *ASRStat) GetStringValue(fmtOpts string) (valStr string) {
func (asr *StatASR) GetStringValue(fmtOpts string) (valStr string) {
if asr.Count == 0 {
return utils.NOT_AVAILABLE
}
@@ -86,12 +88,12 @@ func (asr *ASRStat) GetStringValue(fmtOpts string) (valStr string) {
}
// GetFloat64Value is part of StatMetric interface
func (asr *ASRStat) GetFloat64Value() (val float64) {
func (asr *StatASR) GetFloat64Value() (val float64) {
return asr.getValue()
}
// AddEvent is part of StatMetric interface
func (asr *ASRStat) AddEvent(ev StatEvent) (err error) {
func (asr *StatASR) AddEvent(ev StatEvent) (err error) {
var answered bool
if at, err := ev.AnswerTime(config.CgrConfig().DefaultTimezone); err != nil &&
err != utils.ErrNotFound {
@@ -107,7 +109,7 @@ func (asr *ASRStat) AddEvent(ev StatEvent) (err error) {
return
}
func (asr *ASRStat) RemEvent(evID string) (err error) {
func (asr *StatASR) RemEvent(evID string) (err error) {
answered, has := asr.Events[evID]
if !has {
return utils.ErrNotFound
@@ -120,32 +122,46 @@ func (asr *ASRStat) RemEvent(evID string) (err error) {
return
}
func (asr *StatASR) Marshal(ms Marshaler) (marshaled []byte, err error) {
return ms.Marshal(asr)
}
func (asr *StatASR) LoadFromMarshaled(ms Marshaler, marshaled []byte) (err error) {
return
}
func NewACD() (StatMetric, error) {
return new(ACDStat), nil
return new(StatACD), nil
}
// ACD implements AverageCallDuration metric
type ACDStat struct {
type StatACD struct {
Sum time.Duration
Count int
}
func (acd *ACDStat) GetStringValue(fmtOpts string) (val string) {
func (acd *StatACD) GetStringValue(fmtOpts string) (val string) {
return
}
func (acd *ACDStat) GetValue() (v interface{}) {
func (acd *StatACD) GetValue() (v interface{}) {
return
}
func (acd *ACDStat) GetFloat64Value() (v float64) {
func (acd *StatACD) GetFloat64Value() (v float64) {
return float64(STATS_NA)
}
func (acd *ACDStat) AddEvent(ev StatEvent) (err error) {
func (acd *StatACD) AddEvent(ev StatEvent) (err error) {
return
}
func (acd *ACDStat) RemEvent(evID string) (err error) {
func (acd *StatACD) RemEvent(evID string) (err error) {
return
}
func (acd *StatACD) Marshal(ms Marshaler) (marshaled []byte, err error) {
return
}
func (acd *StatACD) LoadFromMarshaled(ms Marshaler, marshaled []byte) (err error) {
return
}

View File

@@ -115,6 +115,9 @@ type DataDB interface {
GetStatQueueProfile(sqID string) (sq *StatQueueProfile, err error)
SetStatQueueProfile(sq *StatQueueProfile) (err error)
RemStatQueueProfile(sqID string) (err error)
GetStoredStatQueue(tenant, id string) (sq *StoredStatQueue, err error)
SetStoredStatQueue(sq *StoredStatQueue) (err error)
RemStoredStatQueue(tenant, id string) (err error)
GetThresholdCfg(ID string, skipCache bool, transactionID string) (th *ThresholdCfg, err error)
SetThresholdCfg(th *ThresholdCfg) (err error)
RemThresholdCfg(ID string, transactionID string) (err error)

View File

@@ -1514,10 +1514,10 @@ func (ms *MapStorage) RemStatQueueProfile(scfID string) (err error) {
}
// GetStatQueue retrieves the stored metrics for a StatsQueue
func (ms *MapStorage) GetStatQueue(sqID string) (sq *StatQueue, err error) {
func (ms *MapStorage) GetStoredStatQueue(tenant, id string) (sq *StoredStatQueue, err error) {
ms.mu.RLock()
defer ms.mu.RUnlock()
values, ok := ms.dict[utils.StatQueuePrefix+sqID]
values, ok := ms.dict[utils.StatQueuePrefix+utils.ConcatenatedKey(tenant, id)]
if !ok {
return nil, utils.ErrNotFound
}
@@ -1526,7 +1526,7 @@ func (ms *MapStorage) GetStatQueue(sqID string) (sq *StatQueue, err error) {
}
// SetStatQueue stores the metrics for a StatsQueue
func (ms *MapStorage) SetStatQueue(sq *StatQueue) (err error) {
func (ms *MapStorage) SetStoredStatQueue(sq *StoredStatQueue) (err error) {
ms.mu.Lock()
defer ms.mu.Unlock()
var result []byte
@@ -1534,15 +1534,15 @@ func (ms *MapStorage) SetStatQueue(sq *StatQueue) (err error) {
if err != nil {
return err
}
ms.dict[utils.StatQueuePrefix+sq.ID] = result
ms.dict[utils.StatQueuePrefix+sq.SqID()] = result
return
}
// RemStatQueue removes a StatsQueue
func (ms *MapStorage) RemStatQueue(sqID string) (err error) {
func (ms *MapStorage) RemStoredStatQueue(tenant, id string) (err error) {
ms.mu.Lock()
defer ms.mu.Unlock()
delete(ms.dict, utils.StatQueuePrefix+sqID)
delete(ms.dict, utils.StatQueuePrefix+utils.ConcatenatedKey(tenant, id))
return
}

View File

@@ -2046,11 +2046,11 @@ func (ms *MongoStorage) RemStatQueueProfile(sqID string) (err error) {
return
}
// GetStatQueue retrieves a StatsQueue
func (ms *MongoStorage) GetStatQueue(sqID string) (sq *StatQueue, err error) {
// GetStoredStatQueue retrieves a StoredStatQueue
func (ms *MongoStorage) GetStoredStatQueue(tenant, id string) (sq *StoredStatQueue, err error) {
session, col := ms.conn(colStQs)
defer session.Close()
if err = col.Find(bson.M{"id": sqID}).One(&sq); err != nil {
if err = col.Find(bson.M{"tenant": tenant, "id": id}).One(&sq); err != nil {
if err == mgo.ErrNotFound {
err = utils.ErrNotFound
}
@@ -2059,19 +2059,22 @@ func (ms *MongoStorage) GetStatQueue(sqID string) (sq *StatQueue, err error) {
return
}
// SetStoredSQ stores the metrics for a StatsQueue
func (ms *MongoStorage) SetStatQueue(sq *StatQueue) (err error) {
// SetStoredStatQueue stores the metrics for a StoredStatQueue
func (ms *MongoStorage) SetStoredStatQueue(sq *StoredStatQueue) (err error) {
session, col := ms.conn(colStQs)
defer session.Close()
_, err = col.Upsert(bson.M{"id": sq.ID}, sq)
_, err = col.Upsert(bson.M{"tenant": sq.Tenant, "id": sq.ID}, sq)
return
}
// RemStatQueue removes stored metrics for a StatsQueue
func (ms *MongoStorage) RemStatQueue(sqmID string) (err error) {
// RemStatQueue removes stored metrics for a StoredStatQueue
func (ms *MongoStorage) RemStoredStatQueue(tenant, id string) (err error) {
session, col := ms.conn(colStQs)
defer session.Close()
err = col.Remove(bson.M{"sqid": sqmID})
err = col.Remove(bson.M{"tenant": tenant, "id": id})
if err == mgo.ErrNotFound {
err = utils.ErrNotFound
}
return err
}

View File

@@ -1620,9 +1620,9 @@ func (rs *RedisStorage) RemStatQueueProfile(sqID string) (err error) {
return
}
// GetStatQueue retrieves the stored metrics for a StatsQueue
func (rs *RedisStorage) GetStatQueue(sqID string) (sq *StatQueue, err error) {
key := utils.StatQueuePrefix + sqID
// GetStoredStatQueue retrieves the stored metrics for a StatsQueue
func (rs *RedisStorage) GetStoredStatQueue(tenant, id string) (sq *StoredStatQueue, err error) {
key := utils.StatQueuePrefix + utils.ConcatenatedKey(tenant, id)
var values []byte
if values, err = rs.Cmd("GET", key).Bytes(); err != nil {
if err == redis.ErrRespNil {
@@ -1636,19 +1636,19 @@ func (rs *RedisStorage) GetStatQueue(sqID string) (sq *StatQueue, err error) {
return
}
// SetStatQueue stores the metrics for a StatsQueue
func (rs *RedisStorage) SetStatQueue(sq *StatQueue) (err error) {
// SetStoredStatQueue stores the metrics for a StatsQueue
func (rs *RedisStorage) SetStoredStatQueue(sq *StoredStatQueue) (err error) {
var result []byte
result, err = rs.ms.Marshal(sq)
if err != nil {
return
}
return rs.Cmd("SET", utils.StatQueuePrefix+sq.ID, result).Err
return rs.Cmd("SET", utils.StatQueuePrefix+sq.SqID(), result).Err
}
// RemStatQueue removes a StatsQueue
func (rs *RedisStorage) RemStatQueue(sqID string) (err error) {
key := utils.StatQueuePrefix + sqID
func (rs *RedisStorage) RemStoredStatQueue(tenant, id string) (err error) {
key := utils.StatQueuePrefix + utils.ConcatenatedKey(tenant, id)
if err = rs.Cmd("DEL", key).Err; err != nil {
return
}