small fix

This commit is contained in:
Edwardro22
2017-08-03 13:49:37 +03:00
parent fae33969ea
commit 71ef085e12
8 changed files with 46 additions and 82 deletions

View File

@@ -368,23 +368,23 @@ CREATE INDEX tpusers_idx ON tp_users (tpid,tenant,user_name);
DROP TABLE IF EXISTS tp_aliases;
CREATE TABLE tp_aliases (
"id" SERIAL PRIMARY KEY,
"tpid" varchar(64) NOT NULL,
"direction" varchar(8) NOT NULL,
"tenant" varchar(64) NOT NULL,
"category" varchar(64) NOT NULL,
"account" varchar(64) NOT NULL,
"subject" varchar(64) NOT NULL,
"destination_id" varchar(64) NOT NULL,
"context" varchar(64) NOT NULL,
"target" varchar(64) NOT NULL,
"original" varchar(64) NOT NULL,
"alias" varchar(64) NOT NULL,
"weight" NUMERIC(8,2) NOT NULL,
"created_at" TIMESTAMP WITH TIME ZONE
id SERIAL PRIMARY KEY,
tpid varchar(64) NOT NULL,
direction varchar(8) NOT NULL,
tenant varchar(64) NOT NULL,
category varchar(64) NOT NULL,
account varchar(64) NOT NULL,
subject varchar(64) NOT NULL,
destination_id varchar(64) NOT NULL,
context varchar(64) NOT NULL,
target varchar(64) NOT NULL,
original varchar(64) NOT NULL,
alias varchar(64) NOT NULL,
weight NUMERIC(8,2) NOT NULL,
created_at TIMESTAMP WITH TIME ZONE
);
CREATE INDEX tpaliases_tpid_idx ON tp_aliases (tpid);
CREATE INDEX tpaliases_idx ON tp_aliases ("tpid","direction","tenant","category","account","subject","context","target");
CREATE INDEX tpaliases_idx ON tp_aliases (tpid,direction,tenant,category,account,subject,context,target);
--

View File

@@ -324,7 +324,7 @@ func TestLoaderITWriteToDatabase(t *testing.T) {
}
for k, st := range loader.stats {
rcv, err := loader.dataStorage.GetStatsQueue(k, true, utils.NonTransactional)
rcv, err := loader.dataStorage.GetStatsQueue(k)
if err != nil {
t.Error("Failed GetStatsQueue: ", err.Error())
}

View File

@@ -1851,29 +1851,29 @@ func testOnStorITCRUDStatsQueue(t *testing.T) {
Store: true,
Thresholds: []string{},
}
if _, rcvErr := onStor.GetStatsQueue(sq.ID, true, utils.NonTransactional); rcvErr != utils.ErrNotFound {
if _, rcvErr := onStor.GetStatsQueue(sq.ID); rcvErr != utils.ErrNotFound {
t.Error(rcvErr)
}
if err := onStor.SetStatsQueue(sq); err != nil {
t.Error(err)
}
if rcv, err := onStor.GetStatsQueue(sq.ID, true, utils.NonTransactional); err != nil {
if rcv, err := onStor.GetStatsQueue(sq.ID); err != nil {
t.Error(err)
} else if !reflect.DeepEqual(sq, rcv) {
t.Errorf("Expecting: %v, received: %v", sq, rcv)
}
if rcv, err := onStor.GetStatsQueue(sq.ID, false, utils.NonTransactional); err != nil {
if rcv, err := onStor.GetStatsQueue(sq.ID); err != nil {
t.Error(err)
} else if !reflect.DeepEqual(sq, rcv) {
t.Errorf("Expecting: %v, received: %v", sq, rcv)
}
if err := onStor.RemStatsQueue(sq.ID, utils.NonTransactional); err != nil {
if err := onStor.RemStatsQueue(sq.ID); err != nil {
t.Error(err)
}
if _, rcvErr := onStor.GetStatsQueue(sq.ID, true, utils.NonTransactional); rcvErr != utils.ErrNotFound {
if _, rcvErr := onStor.GetStatsQueue(sq.ID); rcvErr != utils.ErrNotFound {
t.Error(rcvErr)
}
if _, rcvErr := onStor.GetStatsQueue(sq.ID, false, utils.NonTransactional); rcvErr != utils.ErrNotFound {
if _, rcvErr := onStor.GetStatsQueue(sq.ID); rcvErr != utils.ErrNotFound {
t.Error(rcvErr)
}
}

4
engine/storage_interface.go Normal file → Executable file
View File

@@ -111,9 +111,9 @@ type DataDB interface {
GetReqFilterIndexes(dbKey string) (indexes map[string]map[string]utils.StringMap, err error)
SetReqFilterIndexes(dbKey string, indexes map[string]map[string]utils.StringMap) (err error)
MatchReqFilterIndex(dbKey, fieldName, fieldVal string) (itemIDs utils.StringMap, err error)
GetStatsQueue(sqID string, skipCache bool, transactionID string) (sq *StatsQueue, err error)
GetStatsQueue(sqID string) (sq *StatsQueue, err error)
SetStatsQueue(sq *StatsQueue) (err error)
RemStatsQueue(sqID string, transactionID string) (err error)
RemStatsQueue(sqID string) (err error)
GetSQStoredMetrics(sqID string) (sqSM *SQStoredMetrics, err error)
SetSQStoredMetrics(sqSM *SQStoredMetrics) (err error)
RemSQStoredMetrics(sqID string) (err error)

View File

@@ -1474,22 +1474,13 @@ func (ms *MapStorage) RemoveVersions(vrs Versions) (err error) {
return
}
// GetStatsQueue retrieves a StatsQueue from dataDB/cache
func (ms *MapStorage) GetStatsQueue(sqID string, skipCache bool, transactionID string) (sq *StatsQueue, err error) {
// GetStatsQueue retrieves a StatsQueue from dataDB
func (ms *MapStorage) GetStatsQueue(sqID string) (sq *StatsQueue, err error) {
ms.mu.RLock()
defer ms.mu.RUnlock()
key := utils.StatsQueuePrefix + sqID
if !skipCache {
if x, ok := cache.Get(key); ok {
if x == nil {
return nil, utils.ErrNotFound
}
return x.(*StatsQueue), nil
}
}
values, ok := ms.dict[key]
if !ok {
cache.Set(key, nil, cacheCommit(transactionID), transactionID)
return nil, utils.ErrNotFound
}
err = ms.ms.Unmarshal(values, &sq)
@@ -1501,7 +1492,6 @@ func (ms *MapStorage) GetStatsQueue(sqID string, skipCache bool, transactionID s
return nil, err
}
}
cache.Set(key, sq, cacheCommit(transactionID), transactionID)
return
}
@@ -1517,13 +1507,12 @@ func (ms *MapStorage) SetStatsQueue(sq *StatsQueue) (err error) {
return
}
// RemStatsQueue removes a StatsQueue from dataDB/cache
func (ms *MapStorage) RemStatsQueue(sqID string, transactionID string) (err error) {
// RemStatsQueue removes a StatsQueue from dataDB
func (ms *MapStorage) RemStatsQueue(sqID string) (err error) {
ms.mu.Lock()
defer ms.mu.Unlock()
key := utils.StatsQueuePrefix + sqID
delete(ms.dict, key)
cache.RemKey(key, cacheCommit(transactionID), transactionID)
return
}
@@ -1560,7 +1549,7 @@ func (ms *MapStorage) RemSQStoredMetrics(sqID string) (err error) {
return
}
// GetStatsQueue retrieves a ThresholdCfg from dataDB/cache
// GetThresholdCfg retrieves a ThresholdCfg from dataDB/cache
func (ms *MapStorage) GetThresholdCfg(ID string, skipCache bool, transactionID string) (th *ThresholdCfg, err error) {
ms.mu.RLock()
defer ms.mu.RUnlock()
@@ -1591,7 +1580,7 @@ func (ms *MapStorage) GetThresholdCfg(ID string, skipCache bool, transactionID s
return
}
// SetStatsQueue stores a ThresholdCfg into DataDB
// SetThresholdCfg stores a ThresholdCfg into DataDB
func (ms *MapStorage) SetThresholdCfg(th *ThresholdCfg) (err error) {
ms.mu.Lock()
defer ms.mu.Unlock()
@@ -1603,7 +1592,7 @@ func (ms *MapStorage) SetThresholdCfg(th *ThresholdCfg) (err error) {
return
}
// RemStatsQueue removes a ThresholdCfg from dataDB/cache
// RemThresholdCfg removes a ThresholdCfg from dataDB/cache
func (ms *MapStorage) RemThresholdCfg(sqID string, transactionID string) (err error) {
ms.mu.Lock()
defer ms.mu.Unlock()

View File

@@ -1984,24 +1984,13 @@ func (ms *MongoStorage) MatchReqFilterIndex(dbKey, fldName, fldVal string) (item
return
}
// GetStatsQueue retrieves a StatsQueue from dataDB/cache
func (ms *MongoStorage) GetStatsQueue(sqID string, skipCache bool, transactionID string) (sq *StatsQueue, err error) {
cacheKey := utils.StatsQueuePrefix + sqID
if !skipCache {
if x, ok := cache.Get(cacheKey); ok {
if x == nil {
return nil, utils.ErrNotFound
}
return x.(*StatsQueue), nil
}
}
// GetStatsQueue retrieves a StatsQueue from dataDB
func (ms *MongoStorage) GetStatsQueue(sqID string) (sq *StatsQueue, err error) {
session, col := ms.conn(utils.StatsQueuePrefix)
defer session.Close()
sq = new(StatsQueue)
cCommit := cacheCommit(transactionID)
if err = col.Find(bson.M{"id": sqID}).One(&sq); err != nil {
if err == mgo.ErrNotFound {
cache.Set(cacheKey, nil, cCommit, transactionID)
err = utils.ErrNotFound
}
return nil, err
@@ -2011,7 +2000,6 @@ func (ms *MongoStorage) GetStatsQueue(sqID string, skipCache bool, transactionID
return
}
}
cache.Set(cacheKey, sq, cCommit, transactionID)
return
}
@@ -2023,15 +2011,13 @@ func (ms *MongoStorage) SetStatsQueue(sq *StatsQueue) (err error) {
return
}
// RemStatsQueue removes a StatsQueue from dataDB/cache
func (ms *MongoStorage) RemStatsQueue(sqID string, transactionID string) (err error) {
// RemStatsQueue removes a StatsQueue from dataDB
func (ms *MongoStorage) RemStatsQueue(sqID string) (err error) {
session, col := ms.conn(utils.StatsQueuePrefix)
key := utils.StatsQueuePrefix + sqID
err = col.Remove(bson.M{"id": sqID})
if err != nil {
return err
}
cache.RemKey(key, cacheCommit(transactionID), transactionID)
session.Close()
return
}
@@ -2065,7 +2051,7 @@ func (ms *MongoStorage) RemSQStoredMetrics(sqmID string) (err error) {
return err
}
// GetStatsQueue retrieves a ThresholdCfg from dataDB/cache
// GetThresholdCfg retrieves a ThresholdCfg from dataDB/cache
func (ms *MongoStorage) GetThresholdCfg(ID string, skipCache bool, transactionID string) (th *ThresholdCfg, err error) {
cacheKey := utils.ThresholdCfgPrefix + ID
if !skipCache {
@@ -2096,7 +2082,7 @@ func (ms *MongoStorage) GetThresholdCfg(ID string, skipCache bool, transactionID
return
}
// SetStatsQueue stores a ThresholdCfg into DataDB
// SetThresholdCfg stores a ThresholdCfg into DataDB
func (ms *MongoStorage) SetThresholdCfg(th *ThresholdCfg) (err error) {
session, col := ms.conn(utils.ThresholdCfgPrefix)
defer session.Close()
@@ -2104,7 +2090,7 @@ func (ms *MongoStorage) SetThresholdCfg(th *ThresholdCfg) (err error) {
return
}
// RemStatsQueue removes a ThresholdCfg from dataDB/cache
// RemThresholdCfg removes a ThresholdCfg from dataDB/cache
func (ms *MongoStorage) RemThresholdCfg(ID string, transactionID string) (err error) {
session, col := ms.conn(utils.ThresholdCfgPrefix)
key := utils.ThresholdCfgPrefix + ID

View File

@@ -1540,21 +1540,12 @@ func (rs *RedisStorage) RemoveVersions(vrs Versions) (err error) {
return
}
// GetStatsQueue retrieves a StatsQueue from dataDB/cache
func (rs *RedisStorage) GetStatsQueue(sqID string, skipCache bool, transactionID string) (sq *StatsQueue, err error) {
// GetStatsQueue retrieves a StatsQueue from dataDB
func (rs *RedisStorage) GetStatsQueue(sqID string) (sq *StatsQueue, err error) {
key := utils.StatsQueuePrefix + sqID
if !skipCache {
if x, ok := cache.Get(key); ok {
if x == nil {
return nil, utils.ErrNotFound
}
return x.(*StatsQueue), nil
}
}
var values []byte
if values, err = rs.Cmd("GET", key).Bytes(); err != nil {
if err == redis.ErrRespNil {
cache.Set(key, nil, cacheCommit(transactionID), transactionID)
err = utils.ErrNotFound
}
return
@@ -1567,7 +1558,6 @@ func (rs *RedisStorage) GetStatsQueue(sqID string, skipCache bool, transactionID
return
}
}
cache.Set(key, sq, cacheCommit(transactionID), transactionID)
return
}
@@ -1581,11 +1571,10 @@ func (rs *RedisStorage) SetStatsQueue(sq *StatsQueue) (err error) {
return rs.Cmd("SET", utils.StatsQueuePrefix+sq.ID, result).Err
}
// RemStatsQueue removes a StatsQueue from dataDB/cache
func (rs *RedisStorage) RemStatsQueue(sqID string, transactionID string) (err error) {
// RemStatsQueue removes a StatsQueue from dataDB
func (rs *RedisStorage) RemStatsQueue(sqID string) (err error) {
key := utils.StatsQueuePrefix + sqID
err = rs.Cmd("DEL", key).Err
cache.RemKey(key, cacheCommit(transactionID), transactionID)
return
}
@@ -1624,7 +1613,7 @@ func (rs *RedisStorage) RemSQStoredMetrics(sqmID string) (err error) {
return
}
// GetStatsQueue retrieves a ThresholdCfg from dataDB/cache
// GetThresholdCfg retrieves a ThresholdCfg from dataDB/cache
func (rs *RedisStorage) GetThresholdCfg(ID string, skipCache bool, transactionID string) (th *ThresholdCfg, err error) {
key := utils.ThresholdCfgPrefix + ID
if !skipCache {
@@ -1655,7 +1644,7 @@ func (rs *RedisStorage) GetThresholdCfg(ID string, skipCache bool, transactionID
return
}
// SetStatsQueue stores a ThresholdCfg into DataDB
// SetThresholdCfg stores a ThresholdCfg into DataDB
func (rs *RedisStorage) SetThresholdCfg(th *ThresholdCfg) (err error) {
var result []byte
result, err = rs.ms.Marshal(th)
@@ -1665,7 +1654,7 @@ func (rs *RedisStorage) SetThresholdCfg(th *ThresholdCfg) (err error) {
return rs.Cmd("SET", utils.ThresholdCfgPrefix+th.ID, result).Err
}
// RemStatsQueue removes a ThresholdCfg from dataDB/cache
// RemThresholdCfg removes a ThresholdCfg from dataDB/cache
func (rs *RedisStorage) RemThresholdCfg(ID string, transactionID string) (err error) {
key := utils.ThresholdCfgPrefix + ID
err = rs.Cmd("DEL", key).Err

2
stats/service.go Normal file → Executable file
View File

@@ -42,7 +42,7 @@ func NewStatService(dataDB engine.DataDB, ms engine.Marshaler, storeInterval tim
}
ss.stInsts = make(StatsInstances, len(sqPrfxs))
for i, prfx := range sqPrfxs {
sq, err := dataDB.GetStatsQueue(prfx[len(utils.StatsQueuePrefix):], false, utils.NonTransactional)
sq, err := dataDB.GetStatsQueue(prfx[len(utils.StatsQueuePrefix):])
if err != nil {
return nil, err
}