mirror of
https://github.com/cgrates/cgrates.git
synced 2026-02-14 20:59:53 +05:00
Add Get/Set/GetAll CdrStats in DataManager
This commit is contained in:
committed by
Dan Christian Bogos
parent
eb629dfe70
commit
aa9ba31e6c
@@ -99,7 +99,7 @@ func (sq *CDRStatsQueue) Save(db DataDB) {
|
||||
defer sq.mux.Unlock()
|
||||
if sq.dirty {
|
||||
// save the conf
|
||||
if err := db.SetCdrStats(sq.conf); err != nil {
|
||||
if err := db.SetCdrStatsDrv(sq.conf); err != nil {
|
||||
utils.Logger.Err(fmt.Sprintf("Error saving cdr stats id %s: %v", sq.conf.Id, err))
|
||||
return
|
||||
}
|
||||
|
||||
@@ -85,7 +85,7 @@ func (svr *queueSaver) stop() {
|
||||
|
||||
func NewStats(dm *DataManager, saveInterval time.Duration) *Stats {
|
||||
cdrStats := &Stats{dm: dm, defaultSaveInterval: saveInterval}
|
||||
if css, err := dm.DataDB().GetAllCdrStats(); err == nil {
|
||||
if css, err := dm.GetAllCdrStats(); err == nil {
|
||||
cdrStats.UpdateQueues(css, nil)
|
||||
} else {
|
||||
utils.Logger.Err(fmt.Sprintf("Cannot load cdr stats: %v", err))
|
||||
@@ -158,7 +158,7 @@ func (s *Stats) AddQueue(cs *CdrStats, out *int) error {
|
||||
s.queues[cs.Id] = sq
|
||||
}
|
||||
// save the conf
|
||||
if err := s.dm.DataDB().SetCdrStats(cs); err != nil {
|
||||
if err := s.dm.SetCdrStats(cs); err != nil {
|
||||
return err
|
||||
}
|
||||
if _, exists = s.queueSavers[sq.GetId()]; !exists {
|
||||
@@ -185,14 +185,14 @@ func (s *Stats) RemoveQueue(qID string, out *int) error {
|
||||
|
||||
func (s *Stats) ReloadQueues(ids []string, out *int) error {
|
||||
if len(ids) == 0 {
|
||||
if css, err := s.dm.DataDB().GetAllCdrStats(); err == nil {
|
||||
if css, err := s.dm.GetAllCdrStats(); err == nil {
|
||||
s.UpdateQueues(css, nil)
|
||||
} else {
|
||||
return fmt.Errorf("Cannot load cdr stats: %v", err)
|
||||
}
|
||||
}
|
||||
for _, id := range ids {
|
||||
if cs, err := s.dm.DataDB().GetCdrStats(id); err == nil {
|
||||
if cs, err := s.dm.GetCdrStats(id); err == nil {
|
||||
s.AddQueue(cs, nil)
|
||||
} else {
|
||||
return err
|
||||
|
||||
@@ -797,3 +797,15 @@ func (dm *DataManager) GetCdrStatsQueue(key string) (sq *CDRStatsQueue, err erro
|
||||
func (dm *DataManager) SetCdrStatsQueue(sq *CDRStatsQueue) (err error) {
|
||||
return dm.DataDB().SetCdrStatsQueueDrv(sq)
|
||||
}
|
||||
|
||||
func (dm *DataManager) SetCdrStats(cs *CdrStats) error {
|
||||
return dm.DataDB().SetCdrStatsDrv(cs)
|
||||
}
|
||||
|
||||
func (dm *DataManager) GetCdrStats(key string) (cs *CdrStats, err error) {
|
||||
return dm.DataDB().GetCdrStatsDrv(key)
|
||||
}
|
||||
|
||||
func (dm *DataManager) GetAllCdrStats() (css []*CdrStats, err error) {
|
||||
return dm.DataDB().GetAllCdrStatsDrv()
|
||||
}
|
||||
|
||||
@@ -281,7 +281,7 @@ func TestLoaderITWriteToDatabase(t *testing.T) {
|
||||
}
|
||||
|
||||
for k, sq := range loader.cdrStats {
|
||||
rcv, err := loader.dm.DataDB().GetCdrStats(k)
|
||||
rcv, err := loader.dm.GetCdrStats(k)
|
||||
// t.Log(utils.ToIJSON(sq))
|
||||
// t.Log(utils.ToIJSON(rcv))
|
||||
t.Log(k)
|
||||
|
||||
@@ -1246,18 +1246,18 @@ func testOnStorITCRUDLCR(t *testing.T) {
|
||||
func testOnStorITCRUDCdrStats(t *testing.T) {
|
||||
cdrs := &CdrStats{Metrics: []string{ASR, PDD, ACD, TCD, ACC, TCC, DDC}}
|
||||
|
||||
if _, rcvErr := onStor.DataDB().GetCdrStats(""); rcvErr != utils.ErrNotFound {
|
||||
if _, rcvErr := onStor.GetCdrStats(""); rcvErr != utils.ErrNotFound {
|
||||
t.Error(rcvErr)
|
||||
}
|
||||
if err := onStor.DataDB().SetCdrStats(cdrs); err != nil {
|
||||
if err := onStor.SetCdrStats(cdrs); err != nil {
|
||||
t.Error(err)
|
||||
}
|
||||
if rcv, err := onStor.DataDB().GetCdrStats(""); err != nil {
|
||||
if rcv, err := onStor.GetCdrStats(""); err != nil {
|
||||
t.Error(err)
|
||||
} else if !reflect.DeepEqual(cdrs.Metrics, rcv.Metrics) {
|
||||
t.Errorf("Expecting: %v, received: %v", cdrs.Metrics, rcv.Metrics)
|
||||
}
|
||||
if rcv, err := onStor.DataDB().GetAllCdrStats(); err != nil {
|
||||
if rcv, err := onStor.GetAllCdrStats(); err != nil {
|
||||
t.Error(err)
|
||||
} else if !reflect.DeepEqual([]*CdrStats{cdrs}[0].Metrics, rcv[0].Metrics) {
|
||||
t.Errorf("Expecting: %v, received: %v", []*CdrStats{cdrs}[0].Metrics, rcv[0].Metrics)
|
||||
|
||||
@@ -61,9 +61,9 @@ type DataDB interface {
|
||||
UpdateReverseDestination(*Destination, *Destination, string) error
|
||||
GetLCRDrv(string) (*LCR, error)
|
||||
SetLCRDrv(*LCR) error
|
||||
SetCdrStats(*CdrStats) error
|
||||
GetCdrStats(string) (*CdrStats, error)
|
||||
GetAllCdrStats() ([]*CdrStats, error)
|
||||
SetCdrStatsDrv(*CdrStats) error
|
||||
GetCdrStatsDrv(string) (*CdrStats, error)
|
||||
GetAllCdrStatsDrv() ([]*CdrStats, error)
|
||||
GetDerivedChargersDrv(string) (*utils.DerivedChargers, error)
|
||||
SetDerivedChargers(string, *utils.DerivedChargers, string) error
|
||||
GetActionsDrv(string) (Actions, error)
|
||||
|
||||
@@ -994,7 +994,7 @@ func (ms *MapStorage) SetDerivedChargers(key string, dcs *utils.DerivedChargers,
|
||||
return err
|
||||
}
|
||||
|
||||
func (ms *MapStorage) SetCdrStats(cs *CdrStats) error {
|
||||
func (ms *MapStorage) SetCdrStatsDrv(cs *CdrStats) error {
|
||||
ms.mu.Lock()
|
||||
defer ms.mu.Unlock()
|
||||
result, err := ms.ms.Marshal(cs)
|
||||
@@ -1002,7 +1002,7 @@ func (ms *MapStorage) SetCdrStats(cs *CdrStats) error {
|
||||
return err
|
||||
}
|
||||
|
||||
func (ms *MapStorage) GetCdrStats(key string) (cs *CdrStats, err error) {
|
||||
func (ms *MapStorage) GetCdrStatsDrv(key string) (cs *CdrStats, err error) {
|
||||
ms.mu.RLock()
|
||||
defer ms.mu.RUnlock()
|
||||
if values, ok := ms.dict[utils.CDR_STATS_PREFIX+key]; ok {
|
||||
@@ -1013,7 +1013,7 @@ func (ms *MapStorage) GetCdrStats(key string) (cs *CdrStats, err error) {
|
||||
return
|
||||
}
|
||||
|
||||
func (ms *MapStorage) GetAllCdrStats() (css []*CdrStats, err error) {
|
||||
func (ms *MapStorage) GetAllCdrStatsDrv() (css []*CdrStats, err error) {
|
||||
ms.mu.RLock()
|
||||
defer ms.mu.RUnlock()
|
||||
for key, value := range ms.dict {
|
||||
|
||||
@@ -1635,14 +1635,14 @@ func (ms *MongoStorage) SetDerivedChargers(key string, dcs *utils.DerivedCharger
|
||||
return err
|
||||
}
|
||||
|
||||
func (ms *MongoStorage) SetCdrStats(cs *CdrStats) error {
|
||||
func (ms *MongoStorage) SetCdrStatsDrv(cs *CdrStats) error {
|
||||
session, col := ms.conn(colCrs)
|
||||
defer session.Close()
|
||||
_, err := col.Upsert(bson.M{"id": cs.Id}, cs)
|
||||
return err
|
||||
}
|
||||
|
||||
func (ms *MongoStorage) GetCdrStats(key string) (cs *CdrStats, err error) {
|
||||
func (ms *MongoStorage) GetCdrStatsDrv(key string) (cs *CdrStats, err error) {
|
||||
cs = new(CdrStats)
|
||||
session, col := ms.conn(colCrs)
|
||||
defer session.Close()
|
||||
@@ -1655,7 +1655,7 @@ func (ms *MongoStorage) GetCdrStats(key string) (cs *CdrStats, err error) {
|
||||
return
|
||||
}
|
||||
|
||||
func (ms *MongoStorage) GetAllCdrStats() (css []*CdrStats, err error) {
|
||||
func (ms *MongoStorage) GetAllCdrStatsDrv() (css []*CdrStats, err error) {
|
||||
session, col := ms.conn(colCrs)
|
||||
defer session.Close()
|
||||
iter := col.Find(nil).Iter()
|
||||
|
||||
@@ -1120,7 +1120,7 @@ func (rs *RedisStorage) SetDerivedChargers(key string, dcs *utils.DerivedCharger
|
||||
return
|
||||
}
|
||||
|
||||
func (rs *RedisStorage) SetCdrStats(cs *CdrStats) error {
|
||||
func (rs *RedisStorage) SetCdrStatsDrv(cs *CdrStats) error {
|
||||
marshaled, err := rs.ms.Marshal(cs)
|
||||
if err != nil {
|
||||
return err
|
||||
@@ -1128,7 +1128,7 @@ func (rs *RedisStorage) SetCdrStats(cs *CdrStats) error {
|
||||
return rs.Cmd("SET", utils.CDR_STATS_PREFIX+cs.Id, marshaled).Err
|
||||
}
|
||||
|
||||
func (rs *RedisStorage) GetCdrStats(key string) (cs *CdrStats, err error) {
|
||||
func (rs *RedisStorage) GetCdrStatsDrv(key string) (cs *CdrStats, err error) {
|
||||
var values []byte
|
||||
if values, err = rs.Cmd("GET", utils.CDR_STATS_PREFIX+key).Bytes(); err != nil {
|
||||
if err == redis.ErrRespNil { // did not find the destination
|
||||
@@ -1142,7 +1142,7 @@ func (rs *RedisStorage) GetCdrStats(key string) (cs *CdrStats, err error) {
|
||||
return
|
||||
}
|
||||
|
||||
func (rs *RedisStorage) GetAllCdrStats() (css []*CdrStats, err error) {
|
||||
func (rs *RedisStorage) GetAllCdrStatsDrv() (css []*CdrStats, err error) {
|
||||
keys, err := rs.Cmd("KEYS", utils.CDR_STATS_PREFIX+"*").List()
|
||||
if err != nil {
|
||||
return nil, err
|
||||
|
||||
@@ -1462,7 +1462,7 @@ func (tpr *TpReader) LoadCdrStatsFiltered(tag string, save bool) (err error) {
|
||||
}
|
||||
}
|
||||
for _, stat := range tpr.cdrStats {
|
||||
if err := tpr.dm.DataDB().SetCdrStats(stat); err != nil {
|
||||
if err := tpr.dm.SetCdrStats(stat); err != nil {
|
||||
return err
|
||||
}
|
||||
}
|
||||
@@ -2015,7 +2015,7 @@ func (tpr *TpReader) WriteToDatabase(flush, verbose, disable_reverse bool) (err
|
||||
log.Print("CDR Stats Queues:")
|
||||
}
|
||||
for _, sq := range tpr.cdrStats {
|
||||
err = tpr.dm.DataDB().SetCdrStats(sq)
|
||||
err = tpr.dm.SetCdrStats(sq)
|
||||
if err != nil {
|
||||
return err
|
||||
}
|
||||
|
||||
Reference in New Issue
Block a user