mirror of
https://github.com/cgrates/cgrates.git
synced 2026-02-11 18:16:24 +05:00
Add Get/Set CDRStatsQueue in DataManager
This commit is contained in:
committed by
Dan Christian Bogos
parent
24b59c4329
commit
eb629dfe70
@@ -104,7 +104,7 @@ func (sq *CDRStatsQueue) Save(db DataDB) {
|
||||
return
|
||||
}
|
||||
|
||||
if err := db.SetCdrStatsQueue(sq); err != nil {
|
||||
if err := db.SetCdrStatsQueueDrv(sq); err != nil {
|
||||
utils.Logger.Err(fmt.Sprintf("Error saving cdr stats queue id %s: %v", sq.GetId(), err))
|
||||
return
|
||||
}
|
||||
|
||||
@@ -254,7 +254,7 @@ func (s *Stats) UpdateQueues(css []*CdrStats, out *int) error {
|
||||
if sq == nil {
|
||||
sq = NewCDRStatsQueue(cs)
|
||||
// load queue from storage if exists
|
||||
if saved, err := s.dm.DataDB().GetCdrStatsQueue(sq.GetId()); err == nil {
|
||||
if saved, err := s.dm.GetCdrStatsQueue(sq.GetId()); err == nil {
|
||||
sq.Load(saved)
|
||||
}
|
||||
s.setupQueueSaver(sq)
|
||||
|
||||
@@ -789,3 +789,11 @@ func (dm *DataManager) GetReqFilterIndexes(dbKey string) (indexes map[string]map
|
||||
func (dm *DataManager) SetReqFilterIndexes(dbKey string, indexes map[string]map[string]utils.StringMap) (err error) {
|
||||
return dm.DataDB().SetReqFilterIndexesDrv(dbKey, indexes)
|
||||
}
|
||||
|
||||
func (dm *DataManager) GetCdrStatsQueue(key string) (sq *CDRStatsQueue, err error) {
|
||||
return dm.DataDB().GetCdrStatsQueueDrv(key)
|
||||
}
|
||||
|
||||
func (dm *DataManager) SetCdrStatsQueue(sq *CDRStatsQueue) (err error) {
|
||||
return dm.DataDB().SetCdrStatsQueueDrv(sq)
|
||||
}
|
||||
|
||||
@@ -1593,13 +1593,13 @@ func testOnStorITCRUDCdrStatsQueue(t *testing.T) {
|
||||
EventTime: time.Date(2012, 1, 1, 0, 0, 0, 0, time.UTC).Local(),
|
||||
}},
|
||||
}
|
||||
if _, rcvErr := onStor.DataDB().GetCdrStatsQueue(sq.GetId()); rcvErr != utils.ErrNotFound {
|
||||
if _, rcvErr := onStor.GetCdrStatsQueue(sq.GetId()); rcvErr != utils.ErrNotFound {
|
||||
t.Error(rcvErr)
|
||||
}
|
||||
if err := onStor.DataDB().SetCdrStatsQueue(sq); err != nil {
|
||||
if err := onStor.SetCdrStatsQueue(sq); err != nil {
|
||||
t.Error(err)
|
||||
}
|
||||
if rcv, err := onStor.DataDB().GetCdrStatsQueue(sq.GetId()); err != nil {
|
||||
if rcv, err := onStor.GetCdrStatsQueue(sq.GetId()); err != nil {
|
||||
t.Error(err)
|
||||
} else if !reflect.DeepEqual(sq.Cdrs, rcv.Cdrs) {
|
||||
t.Errorf("Expecting: %v, received: %v", sq.Cdrs, rcv.Cdrs)
|
||||
|
||||
@@ -447,10 +447,10 @@ func TestStatsSaveRestoreQeue(t *testing.T) {
|
||||
conf: &CdrStats{Id: "TTT"},
|
||||
Cdrs: []*QCdr{&QCdr{Cost: 9.0}},
|
||||
}
|
||||
if err := dm.DataDB().SetCdrStatsQueue(sq); err != nil {
|
||||
if err := dm.SetCdrStatsQueue(sq); err != nil {
|
||||
t.Error("Error saving metric: ", err)
|
||||
}
|
||||
recovered, err := dm.DataDB().GetCdrStatsQueue(sq.GetId())
|
||||
recovered, err := dm.GetCdrStatsQueue(sq.GetId())
|
||||
if err != nil {
|
||||
t.Error("Error loading metric: ", err)
|
||||
}
|
||||
|
||||
@@ -85,8 +85,8 @@ type DataDB interface {
|
||||
GetAccount(string) (*Account, error)
|
||||
SetAccount(*Account) error
|
||||
RemoveAccount(string) error
|
||||
GetCdrStatsQueue(string) (*CDRStatsQueue, error)
|
||||
SetCdrStatsQueue(*CDRStatsQueue) error
|
||||
GetCdrStatsQueueDrv(string) (*CDRStatsQueue, error)
|
||||
SetCdrStatsQueueDrv(*CDRStatsQueue) error
|
||||
GetSubscribersDrv() (map[string]*SubscriberData, error)
|
||||
SetSubscriberDrv(string, *SubscriberData) error
|
||||
RemoveSubscriberDrv(string) error
|
||||
|
||||
@@ -540,7 +540,7 @@ func (ms *MapStorage) RemoveAccount(key string) (err error) {
|
||||
return
|
||||
}
|
||||
|
||||
func (ms *MapStorage) GetCdrStatsQueue(key string) (sq *CDRStatsQueue, err error) {
|
||||
func (ms *MapStorage) GetCdrStatsQueueDrv(key string) (sq *CDRStatsQueue, err error) {
|
||||
ms.mu.RLock()
|
||||
defer ms.mu.RUnlock()
|
||||
if values, ok := ms.dict[utils.CDR_STATS_QUEUE_PREFIX+key]; ok {
|
||||
@@ -552,7 +552,7 @@ func (ms *MapStorage) GetCdrStatsQueue(key string) (sq *CDRStatsQueue, err error
|
||||
return
|
||||
}
|
||||
|
||||
func (ms *MapStorage) SetCdrStatsQueue(sq *CDRStatsQueue) (err error) {
|
||||
func (ms *MapStorage) SetCdrStatsQueueDrv(sq *CDRStatsQueue) (err error) {
|
||||
ms.mu.Lock()
|
||||
defer ms.mu.Unlock()
|
||||
result, err := ms.ms.Marshal(sq)
|
||||
|
||||
@@ -1030,7 +1030,7 @@ func (ms *MongoStorage) RemoveAccount(key string) error {
|
||||
|
||||
}
|
||||
|
||||
func (ms *MongoStorage) GetCdrStatsQueue(key string) (sq *CDRStatsQueue, err error) {
|
||||
func (ms *MongoStorage) GetCdrStatsQueueDrv(key string) (sq *CDRStatsQueue, err error) {
|
||||
var result struct {
|
||||
Key string
|
||||
Value *CDRStatsQueue
|
||||
@@ -1047,7 +1047,7 @@ func (ms *MongoStorage) GetCdrStatsQueue(key string) (sq *CDRStatsQueue, err err
|
||||
return
|
||||
}
|
||||
|
||||
func (ms *MongoStorage) SetCdrStatsQueue(sq *CDRStatsQueue) (err error) {
|
||||
func (ms *MongoStorage) SetCdrStatsQueueDrv(sq *CDRStatsQueue) (err error) {
|
||||
session, col := ms.conn(colStq)
|
||||
defer session.Close()
|
||||
_, err = col.Upsert(bson.M{"key": sq.GetId()}, &struct {
|
||||
|
||||
@@ -588,7 +588,7 @@ func (rs *RedisStorage) RemoveAccount(key string) (err error) {
|
||||
|
||||
}
|
||||
|
||||
func (rs *RedisStorage) GetCdrStatsQueue(key string) (sq *CDRStatsQueue, err error) {
|
||||
func (rs *RedisStorage) GetCdrStatsQueueDrv(key string) (sq *CDRStatsQueue, err error) {
|
||||
var values []byte
|
||||
if values, err = rs.Cmd("GET", utils.CDR_STATS_QUEUE_PREFIX+key).Bytes(); err != nil {
|
||||
if err == redis.ErrRespNil { // did not find the destination
|
||||
@@ -603,7 +603,7 @@ func (rs *RedisStorage) GetCdrStatsQueue(key string) (sq *CDRStatsQueue, err err
|
||||
return
|
||||
}
|
||||
|
||||
func (rs *RedisStorage) SetCdrStatsQueue(sq *CDRStatsQueue) (err error) {
|
||||
func (rs *RedisStorage) SetCdrStatsQueueDrv(sq *CDRStatsQueue) (err error) {
|
||||
var result []byte
|
||||
if result, err = rs.ms.Marshal(sq); err != nil {
|
||||
return
|
||||
|
||||
Reference in New Issue
Block a user