From aa9ba31e6c6b62f95b81428dbed347ea8f0b4fa7 Mon Sep 17 00:00:00 2001 From: TeoV Date: Tue, 7 Nov 2017 10:16:45 +0200 Subject: [PATCH] Add Get/Set/GetAll CdrStats in DataManager --- engine/cdrstats_queue.go | 2 +- engine/cdrstatsiface.go | 8 ++++---- engine/datamanager.go | 12 ++++++++++++ engine/loader_it_test.go | 2 +- engine/onstor_it_test.go | 8 ++++---- engine/storage_interface.go | 6 +++--- engine/storage_map.go | 6 +++--- engine/storage_mongo_datadb.go | 6 +++--- engine/storage_redis.go | 6 +++--- engine/tp_reader.go | 4 ++-- 10 files changed, 36 insertions(+), 24 deletions(-) diff --git a/engine/cdrstats_queue.go b/engine/cdrstats_queue.go index 90c81fe8d..ea5213a2d 100644 --- a/engine/cdrstats_queue.go +++ b/engine/cdrstats_queue.go @@ -99,7 +99,7 @@ func (sq *CDRStatsQueue) Save(db DataDB) { defer sq.mux.Unlock() if sq.dirty { // save the conf - if err := db.SetCdrStats(sq.conf); err != nil { + if err := db.SetCdrStatsDrv(sq.conf); err != nil { utils.Logger.Err(fmt.Sprintf("Error saving cdr stats id %s: %v", sq.conf.Id, err)) return } diff --git a/engine/cdrstatsiface.go b/engine/cdrstatsiface.go index 6cdc8bb8a..1f0ade533 100644 --- a/engine/cdrstatsiface.go +++ b/engine/cdrstatsiface.go @@ -85,7 +85,7 @@ func (svr *queueSaver) stop() { func NewStats(dm *DataManager, saveInterval time.Duration) *Stats { cdrStats := &Stats{dm: dm, defaultSaveInterval: saveInterval} - if css, err := dm.DataDB().GetAllCdrStats(); err == nil { + if css, err := dm.GetAllCdrStats(); err == nil { cdrStats.UpdateQueues(css, nil) } else { utils.Logger.Err(fmt.Sprintf("Cannot load cdr stats: %v", err)) @@ -158,7 +158,7 @@ func (s *Stats) AddQueue(cs *CdrStats, out *int) error { s.queues[cs.Id] = sq } // save the conf - if err := s.dm.DataDB().SetCdrStats(cs); err != nil { + if err := s.dm.SetCdrStats(cs); err != nil { return err } if _, exists = s.queueSavers[sq.GetId()]; !exists { @@ -185,14 +185,14 @@ func (s *Stats) RemoveQueue(qID string, out *int) error { func (s *Stats) ReloadQueues(ids []string, out *int) error { if len(ids) == 0 { - if css, err := s.dm.DataDB().GetAllCdrStats(); err == nil { + if css, err := s.dm.GetAllCdrStats(); err == nil { s.UpdateQueues(css, nil) } else { return fmt.Errorf("Cannot load cdr stats: %v", err) } } for _, id := range ids { - if cs, err := s.dm.DataDB().GetCdrStats(id); err == nil { + if cs, err := s.dm.GetCdrStats(id); err == nil { s.AddQueue(cs, nil) } else { return err diff --git a/engine/datamanager.go b/engine/datamanager.go index 0ac5cff90..1341fcf34 100644 --- a/engine/datamanager.go +++ b/engine/datamanager.go @@ -797,3 +797,15 @@ func (dm *DataManager) GetCdrStatsQueue(key string) (sq *CDRStatsQueue, err erro func (dm *DataManager) SetCdrStatsQueue(sq *CDRStatsQueue) (err error) { return dm.DataDB().SetCdrStatsQueueDrv(sq) } + +func (dm *DataManager) SetCdrStats(cs *CdrStats) error { + return dm.DataDB().SetCdrStatsDrv(cs) +} + +func (dm *DataManager) GetCdrStats(key string) (cs *CdrStats, err error) { + return dm.DataDB().GetCdrStatsDrv(key) +} + +func (dm *DataManager) GetAllCdrStats() (css []*CdrStats, err error) { + return dm.DataDB().GetAllCdrStatsDrv() +} diff --git a/engine/loader_it_test.go b/engine/loader_it_test.go index c5ffc920c..2ca9e484f 100755 --- a/engine/loader_it_test.go +++ b/engine/loader_it_test.go @@ -281,7 +281,7 @@ func TestLoaderITWriteToDatabase(t *testing.T) { } for k, sq := range loader.cdrStats { - rcv, err := loader.dm.DataDB().GetCdrStats(k) + rcv, err := loader.dm.GetCdrStats(k) // t.Log(utils.ToIJSON(sq)) // t.Log(utils.ToIJSON(rcv)) t.Log(k) diff --git a/engine/onstor_it_test.go b/engine/onstor_it_test.go index f71a5af6f..d64345443 100644 --- a/engine/onstor_it_test.go +++ b/engine/onstor_it_test.go @@ -1246,18 +1246,18 @@ func testOnStorITCRUDLCR(t *testing.T) { func testOnStorITCRUDCdrStats(t *testing.T) { cdrs := &CdrStats{Metrics: []string{ASR, PDD, ACD, TCD, ACC, TCC, DDC}} - if _, rcvErr := onStor.DataDB().GetCdrStats(""); rcvErr != utils.ErrNotFound { + if _, rcvErr := onStor.GetCdrStats(""); rcvErr != utils.ErrNotFound { t.Error(rcvErr) } - if err := onStor.DataDB().SetCdrStats(cdrs); err != nil { + if err := onStor.SetCdrStats(cdrs); err != nil { t.Error(err) } - if rcv, err := onStor.DataDB().GetCdrStats(""); err != nil { + if rcv, err := onStor.GetCdrStats(""); err != nil { t.Error(err) } else if !reflect.DeepEqual(cdrs.Metrics, rcv.Metrics) { t.Errorf("Expecting: %v, received: %v", cdrs.Metrics, rcv.Metrics) } - if rcv, err := onStor.DataDB().GetAllCdrStats(); err != nil { + if rcv, err := onStor.GetAllCdrStats(); err != nil { t.Error(err) } else if !reflect.DeepEqual([]*CdrStats{cdrs}[0].Metrics, rcv[0].Metrics) { t.Errorf("Expecting: %v, received: %v", []*CdrStats{cdrs}[0].Metrics, rcv[0].Metrics) diff --git a/engine/storage_interface.go b/engine/storage_interface.go index 7af81391a..40c389626 100755 --- a/engine/storage_interface.go +++ b/engine/storage_interface.go @@ -61,9 +61,9 @@ type DataDB interface { UpdateReverseDestination(*Destination, *Destination, string) error GetLCRDrv(string) (*LCR, error) SetLCRDrv(*LCR) error - SetCdrStats(*CdrStats) error - GetCdrStats(string) (*CdrStats, error) - GetAllCdrStats() ([]*CdrStats, error) + SetCdrStatsDrv(*CdrStats) error + GetCdrStatsDrv(string) (*CdrStats, error) + GetAllCdrStatsDrv() ([]*CdrStats, error) GetDerivedChargersDrv(string) (*utils.DerivedChargers, error) SetDerivedChargers(string, *utils.DerivedChargers, string) error GetActionsDrv(string) (Actions, error) diff --git a/engine/storage_map.go b/engine/storage_map.go index e2116c6e8..39c76ffad 100755 --- a/engine/storage_map.go +++ b/engine/storage_map.go @@ -994,7 +994,7 @@ func (ms *MapStorage) SetDerivedChargers(key string, dcs *utils.DerivedChargers, return err } -func (ms *MapStorage) SetCdrStats(cs *CdrStats) error { +func (ms *MapStorage) SetCdrStatsDrv(cs *CdrStats) error { ms.mu.Lock() defer ms.mu.Unlock() result, err := ms.ms.Marshal(cs) @@ -1002,7 +1002,7 @@ func (ms *MapStorage) SetCdrStats(cs *CdrStats) error { return err } -func (ms *MapStorage) GetCdrStats(key string) (cs *CdrStats, err error) { +func (ms *MapStorage) GetCdrStatsDrv(key string) (cs *CdrStats, err error) { ms.mu.RLock() defer ms.mu.RUnlock() if values, ok := ms.dict[utils.CDR_STATS_PREFIX+key]; ok { @@ -1013,7 +1013,7 @@ func (ms *MapStorage) GetCdrStats(key string) (cs *CdrStats, err error) { return } -func (ms *MapStorage) GetAllCdrStats() (css []*CdrStats, err error) { +func (ms *MapStorage) GetAllCdrStatsDrv() (css []*CdrStats, err error) { ms.mu.RLock() defer ms.mu.RUnlock() for key, value := range ms.dict { diff --git a/engine/storage_mongo_datadb.go b/engine/storage_mongo_datadb.go index fe08ad980..13895ac9c 100755 --- a/engine/storage_mongo_datadb.go +++ b/engine/storage_mongo_datadb.go @@ -1635,14 +1635,14 @@ func (ms *MongoStorage) SetDerivedChargers(key string, dcs *utils.DerivedCharger return err } -func (ms *MongoStorage) SetCdrStats(cs *CdrStats) error { +func (ms *MongoStorage) SetCdrStatsDrv(cs *CdrStats) error { session, col := ms.conn(colCrs) defer session.Close() _, err := col.Upsert(bson.M{"id": cs.Id}, cs) return err } -func (ms *MongoStorage) GetCdrStats(key string) (cs *CdrStats, err error) { +func (ms *MongoStorage) GetCdrStatsDrv(key string) (cs *CdrStats, err error) { cs = new(CdrStats) session, col := ms.conn(colCrs) defer session.Close() @@ -1655,7 +1655,7 @@ func (ms *MongoStorage) GetCdrStats(key string) (cs *CdrStats, err error) { return } -func (ms *MongoStorage) GetAllCdrStats() (css []*CdrStats, err error) { +func (ms *MongoStorage) GetAllCdrStatsDrv() (css []*CdrStats, err error) { session, col := ms.conn(colCrs) defer session.Close() iter := col.Find(nil).Iter() diff --git a/engine/storage_redis.go b/engine/storage_redis.go index 9c86f4ca5..1c350ee6e 100755 --- a/engine/storage_redis.go +++ b/engine/storage_redis.go @@ -1120,7 +1120,7 @@ func (rs *RedisStorage) SetDerivedChargers(key string, dcs *utils.DerivedCharger return } -func (rs *RedisStorage) SetCdrStats(cs *CdrStats) error { +func (rs *RedisStorage) SetCdrStatsDrv(cs *CdrStats) error { marshaled, err := rs.ms.Marshal(cs) if err != nil { return err @@ -1128,7 +1128,7 @@ func (rs *RedisStorage) SetCdrStats(cs *CdrStats) error { return rs.Cmd("SET", utils.CDR_STATS_PREFIX+cs.Id, marshaled).Err } -func (rs *RedisStorage) GetCdrStats(key string) (cs *CdrStats, err error) { +func (rs *RedisStorage) GetCdrStatsDrv(key string) (cs *CdrStats, err error) { var values []byte if values, err = rs.Cmd("GET", utils.CDR_STATS_PREFIX+key).Bytes(); err != nil { if err == redis.ErrRespNil { // did not find the destination @@ -1142,7 +1142,7 @@ func (rs *RedisStorage) GetCdrStats(key string) (cs *CdrStats, err error) { return } -func (rs *RedisStorage) GetAllCdrStats() (css []*CdrStats, err error) { +func (rs *RedisStorage) GetAllCdrStatsDrv() (css []*CdrStats, err error) { keys, err := rs.Cmd("KEYS", utils.CDR_STATS_PREFIX+"*").List() if err != nil { return nil, err diff --git a/engine/tp_reader.go b/engine/tp_reader.go index e445154b8..05267871f 100755 --- a/engine/tp_reader.go +++ b/engine/tp_reader.go @@ -1462,7 +1462,7 @@ func (tpr *TpReader) LoadCdrStatsFiltered(tag string, save bool) (err error) { } } for _, stat := range tpr.cdrStats { - if err := tpr.dm.DataDB().SetCdrStats(stat); err != nil { + if err := tpr.dm.SetCdrStats(stat); err != nil { return err } } @@ -2015,7 +2015,7 @@ func (tpr *TpReader) WriteToDatabase(flush, verbose, disable_reverse bool) (err log.Print("CDR Stats Queues:") } for _, sq := range tpr.cdrStats { - err = tpr.dm.DataDB().SetCdrStats(sq) + err = tpr.dm.SetCdrStats(sq) if err != nil { return err }