diff --git a/data/storage/postgres/create_tariffplan_tables.sql b/data/storage/postgres/create_tariffplan_tables.sql index 5bc43b9c0..fd4eb3ee9 100644 --- a/data/storage/postgres/create_tariffplan_tables.sql +++ b/data/storage/postgres/create_tariffplan_tables.sql @@ -368,23 +368,23 @@ CREATE INDEX tpusers_idx ON tp_users (tpid,tenant,user_name); DROP TABLE IF EXISTS tp_aliases; CREATE TABLE tp_aliases ( - "id" SERIAL PRIMARY KEY, - "tpid" varchar(64) NOT NULL, - "direction" varchar(8) NOT NULL, - "tenant" varchar(64) NOT NULL, - "category" varchar(64) NOT NULL, - "account" varchar(64) NOT NULL, - "subject" varchar(64) NOT NULL, - "destination_id" varchar(64) NOT NULL, - "context" varchar(64) NOT NULL, - "target" varchar(64) NOT NULL, - "original" varchar(64) NOT NULL, - "alias" varchar(64) NOT NULL, - "weight" NUMERIC(8,2) NOT NULL, - "created_at" TIMESTAMP WITH TIME ZONE + id SERIAL PRIMARY KEY, + tpid varchar(64) NOT NULL, + direction varchar(8) NOT NULL, + tenant varchar(64) NOT NULL, + category varchar(64) NOT NULL, + account varchar(64) NOT NULL, + subject varchar(64) NOT NULL, + destination_id varchar(64) NOT NULL, + context varchar(64) NOT NULL, + target varchar(64) NOT NULL, + original varchar(64) NOT NULL, + alias varchar(64) NOT NULL, + weight NUMERIC(8,2) NOT NULL, + created_at TIMESTAMP WITH TIME ZONE ); CREATE INDEX tpaliases_tpid_idx ON tp_aliases (tpid); -CREATE INDEX tpaliases_idx ON tp_aliases ("tpid","direction","tenant","category","account","subject","context","target"); +CREATE INDEX tpaliases_idx ON tp_aliases (tpid,direction,tenant,category,account,subject,context,target); -- diff --git a/engine/loader_it_test.go b/engine/loader_it_test.go index 74620b49c..697c18397 100755 --- a/engine/loader_it_test.go +++ b/engine/loader_it_test.go @@ -324,7 +324,7 @@ func TestLoaderITWriteToDatabase(t *testing.T) { } for k, st := range loader.stats { - rcv, err := loader.dataStorage.GetStatsQueue(k, true, utils.NonTransactional) + rcv, err := loader.dataStorage.GetStatsQueue(k) if err != nil { t.Error("Failed GetStatsQueue: ", err.Error()) } diff --git a/engine/onstor_it_test.go b/engine/onstor_it_test.go index cb20f7c3e..71dc9589d 100644 --- a/engine/onstor_it_test.go +++ b/engine/onstor_it_test.go @@ -1851,29 +1851,29 @@ func testOnStorITCRUDStatsQueue(t *testing.T) { Store: true, Thresholds: []string{}, } - if _, rcvErr := onStor.GetStatsQueue(sq.ID, true, utils.NonTransactional); rcvErr != utils.ErrNotFound { + if _, rcvErr := onStor.GetStatsQueue(sq.ID); rcvErr != utils.ErrNotFound { t.Error(rcvErr) } if err := onStor.SetStatsQueue(sq); err != nil { t.Error(err) } - if rcv, err := onStor.GetStatsQueue(sq.ID, true, utils.NonTransactional); err != nil { + if rcv, err := onStor.GetStatsQueue(sq.ID); err != nil { t.Error(err) } else if !reflect.DeepEqual(sq, rcv) { t.Errorf("Expecting: %v, received: %v", sq, rcv) } - if rcv, err := onStor.GetStatsQueue(sq.ID, false, utils.NonTransactional); err != nil { + if rcv, err := onStor.GetStatsQueue(sq.ID); err != nil { t.Error(err) } else if !reflect.DeepEqual(sq, rcv) { t.Errorf("Expecting: %v, received: %v", sq, rcv) } - if err := onStor.RemStatsQueue(sq.ID, utils.NonTransactional); err != nil { + if err := onStor.RemStatsQueue(sq.ID); err != nil { t.Error(err) } - if _, rcvErr := onStor.GetStatsQueue(sq.ID, true, utils.NonTransactional); rcvErr != utils.ErrNotFound { + if _, rcvErr := onStor.GetStatsQueue(sq.ID); rcvErr != utils.ErrNotFound { t.Error(rcvErr) } - if _, rcvErr := onStor.GetStatsQueue(sq.ID, false, utils.NonTransactional); rcvErr != utils.ErrNotFound { + if _, rcvErr := onStor.GetStatsQueue(sq.ID); rcvErr != utils.ErrNotFound { t.Error(rcvErr) } } diff --git a/engine/storage_interface.go b/engine/storage_interface.go old mode 100644 new mode 100755 index 3afa9238d..04d0af530 --- a/engine/storage_interface.go +++ b/engine/storage_interface.go @@ -111,9 +111,9 @@ type DataDB interface { GetReqFilterIndexes(dbKey string) (indexes map[string]map[string]utils.StringMap, err error) SetReqFilterIndexes(dbKey string, indexes map[string]map[string]utils.StringMap) (err error) MatchReqFilterIndex(dbKey, fieldName, fieldVal string) (itemIDs utils.StringMap, err error) - GetStatsQueue(sqID string, skipCache bool, transactionID string) (sq *StatsQueue, err error) + GetStatsQueue(sqID string) (sq *StatsQueue, err error) SetStatsQueue(sq *StatsQueue) (err error) - RemStatsQueue(sqID string, transactionID string) (err error) + RemStatsQueue(sqID string) (err error) GetSQStoredMetrics(sqID string) (sqSM *SQStoredMetrics, err error) SetSQStoredMetrics(sqSM *SQStoredMetrics) (err error) RemSQStoredMetrics(sqID string) (err error) diff --git a/engine/storage_map.go b/engine/storage_map.go index 75f9fb8a2..54f1451c7 100755 --- a/engine/storage_map.go +++ b/engine/storage_map.go @@ -1474,22 +1474,13 @@ func (ms *MapStorage) RemoveVersions(vrs Versions) (err error) { return } -// GetStatsQueue retrieves a StatsQueue from dataDB/cache -func (ms *MapStorage) GetStatsQueue(sqID string, skipCache bool, transactionID string) (sq *StatsQueue, err error) { +// GetStatsQueue retrieves a StatsQueue from dataDB +func (ms *MapStorage) GetStatsQueue(sqID string) (sq *StatsQueue, err error) { ms.mu.RLock() defer ms.mu.RUnlock() key := utils.StatsQueuePrefix + sqID - if !skipCache { - if x, ok := cache.Get(key); ok { - if x == nil { - return nil, utils.ErrNotFound - } - return x.(*StatsQueue), nil - } - } values, ok := ms.dict[key] if !ok { - cache.Set(key, nil, cacheCommit(transactionID), transactionID) return nil, utils.ErrNotFound } err = ms.ms.Unmarshal(values, &sq) @@ -1501,7 +1492,6 @@ func (ms *MapStorage) GetStatsQueue(sqID string, skipCache bool, transactionID s return nil, err } } - cache.Set(key, sq, cacheCommit(transactionID), transactionID) return } @@ -1517,13 +1507,12 @@ func (ms *MapStorage) SetStatsQueue(sq *StatsQueue) (err error) { return } -// RemStatsQueue removes a StatsQueue from dataDB/cache -func (ms *MapStorage) RemStatsQueue(sqID string, transactionID string) (err error) { +// RemStatsQueue removes a StatsQueue from dataDB +func (ms *MapStorage) RemStatsQueue(sqID string) (err error) { ms.mu.Lock() defer ms.mu.Unlock() key := utils.StatsQueuePrefix + sqID delete(ms.dict, key) - cache.RemKey(key, cacheCommit(transactionID), transactionID) return } @@ -1560,7 +1549,7 @@ func (ms *MapStorage) RemSQStoredMetrics(sqID string) (err error) { return } -// GetStatsQueue retrieves a ThresholdCfg from dataDB/cache +// GetThresholdCfg retrieves a ThresholdCfg from dataDB/cache func (ms *MapStorage) GetThresholdCfg(ID string, skipCache bool, transactionID string) (th *ThresholdCfg, err error) { ms.mu.RLock() defer ms.mu.RUnlock() @@ -1591,7 +1580,7 @@ func (ms *MapStorage) GetThresholdCfg(ID string, skipCache bool, transactionID s return } -// SetStatsQueue stores a ThresholdCfg into DataDB +// SetThresholdCfg stores a ThresholdCfg into DataDB func (ms *MapStorage) SetThresholdCfg(th *ThresholdCfg) (err error) { ms.mu.Lock() defer ms.mu.Unlock() @@ -1603,7 +1592,7 @@ func (ms *MapStorage) SetThresholdCfg(th *ThresholdCfg) (err error) { return } -// RemStatsQueue removes a ThresholdCfg from dataDB/cache +// RemThresholdCfg removes a ThresholdCfg from dataDB/cache func (ms *MapStorage) RemThresholdCfg(sqID string, transactionID string) (err error) { ms.mu.Lock() defer ms.mu.Unlock() diff --git a/engine/storage_mongo_datadb.go b/engine/storage_mongo_datadb.go index fd2c588e4..efc5ddb19 100755 --- a/engine/storage_mongo_datadb.go +++ b/engine/storage_mongo_datadb.go @@ -1984,24 +1984,13 @@ func (ms *MongoStorage) MatchReqFilterIndex(dbKey, fldName, fldVal string) (item return } -// GetStatsQueue retrieves a StatsQueue from dataDB/cache -func (ms *MongoStorage) GetStatsQueue(sqID string, skipCache bool, transactionID string) (sq *StatsQueue, err error) { - cacheKey := utils.StatsQueuePrefix + sqID - if !skipCache { - if x, ok := cache.Get(cacheKey); ok { - if x == nil { - return nil, utils.ErrNotFound - } - return x.(*StatsQueue), nil - } - } +// GetStatsQueue retrieves a StatsQueue from dataDB +func (ms *MongoStorage) GetStatsQueue(sqID string) (sq *StatsQueue, err error) { session, col := ms.conn(utils.StatsQueuePrefix) defer session.Close() sq = new(StatsQueue) - cCommit := cacheCommit(transactionID) if err = col.Find(bson.M{"id": sqID}).One(&sq); err != nil { if err == mgo.ErrNotFound { - cache.Set(cacheKey, nil, cCommit, transactionID) err = utils.ErrNotFound } return nil, err @@ -2011,7 +2000,6 @@ func (ms *MongoStorage) GetStatsQueue(sqID string, skipCache bool, transactionID return } } - cache.Set(cacheKey, sq, cCommit, transactionID) return } @@ -2023,15 +2011,13 @@ func (ms *MongoStorage) SetStatsQueue(sq *StatsQueue) (err error) { return } -// RemStatsQueue removes a StatsQueue from dataDB/cache -func (ms *MongoStorage) RemStatsQueue(sqID string, transactionID string) (err error) { +// RemStatsQueue removes a StatsQueue from dataDB +func (ms *MongoStorage) RemStatsQueue(sqID string) (err error) { session, col := ms.conn(utils.StatsQueuePrefix) - key := utils.StatsQueuePrefix + sqID err = col.Remove(bson.M{"id": sqID}) if err != nil { return err } - cache.RemKey(key, cacheCommit(transactionID), transactionID) session.Close() return } @@ -2065,7 +2051,7 @@ func (ms *MongoStorage) RemSQStoredMetrics(sqmID string) (err error) { return err } -// GetStatsQueue retrieves a ThresholdCfg from dataDB/cache +// GetThresholdCfg retrieves a ThresholdCfg from dataDB/cache func (ms *MongoStorage) GetThresholdCfg(ID string, skipCache bool, transactionID string) (th *ThresholdCfg, err error) { cacheKey := utils.ThresholdCfgPrefix + ID if !skipCache { @@ -2096,7 +2082,7 @@ func (ms *MongoStorage) GetThresholdCfg(ID string, skipCache bool, transactionID return } -// SetStatsQueue stores a ThresholdCfg into DataDB +// SetThresholdCfg stores a ThresholdCfg into DataDB func (ms *MongoStorage) SetThresholdCfg(th *ThresholdCfg) (err error) { session, col := ms.conn(utils.ThresholdCfgPrefix) defer session.Close() @@ -2104,7 +2090,7 @@ func (ms *MongoStorage) SetThresholdCfg(th *ThresholdCfg) (err error) { return } -// RemStatsQueue removes a ThresholdCfg from dataDB/cache +// RemThresholdCfg removes a ThresholdCfg from dataDB/cache func (ms *MongoStorage) RemThresholdCfg(ID string, transactionID string) (err error) { session, col := ms.conn(utils.ThresholdCfgPrefix) key := utils.ThresholdCfgPrefix + ID diff --git a/engine/storage_redis.go b/engine/storage_redis.go index f116dee64..586ed4c31 100755 --- a/engine/storage_redis.go +++ b/engine/storage_redis.go @@ -1540,21 +1540,12 @@ func (rs *RedisStorage) RemoveVersions(vrs Versions) (err error) { return } -// GetStatsQueue retrieves a StatsQueue from dataDB/cache -func (rs *RedisStorage) GetStatsQueue(sqID string, skipCache bool, transactionID string) (sq *StatsQueue, err error) { +// GetStatsQueue retrieves a StatsQueue from dataDB +func (rs *RedisStorage) GetStatsQueue(sqID string) (sq *StatsQueue, err error) { key := utils.StatsQueuePrefix + sqID - if !skipCache { - if x, ok := cache.Get(key); ok { - if x == nil { - return nil, utils.ErrNotFound - } - return x.(*StatsQueue), nil - } - } var values []byte if values, err = rs.Cmd("GET", key).Bytes(); err != nil { if err == redis.ErrRespNil { - cache.Set(key, nil, cacheCommit(transactionID), transactionID) err = utils.ErrNotFound } return @@ -1567,7 +1558,6 @@ func (rs *RedisStorage) GetStatsQueue(sqID string, skipCache bool, transactionID return } } - cache.Set(key, sq, cacheCommit(transactionID), transactionID) return } @@ -1581,11 +1571,10 @@ func (rs *RedisStorage) SetStatsQueue(sq *StatsQueue) (err error) { return rs.Cmd("SET", utils.StatsQueuePrefix+sq.ID, result).Err } -// RemStatsQueue removes a StatsQueue from dataDB/cache -func (rs *RedisStorage) RemStatsQueue(sqID string, transactionID string) (err error) { +// RemStatsQueue removes a StatsQueue from dataDB +func (rs *RedisStorage) RemStatsQueue(sqID string) (err error) { key := utils.StatsQueuePrefix + sqID err = rs.Cmd("DEL", key).Err - cache.RemKey(key, cacheCommit(transactionID), transactionID) return } @@ -1624,7 +1613,7 @@ func (rs *RedisStorage) RemSQStoredMetrics(sqmID string) (err error) { return } -// GetStatsQueue retrieves a ThresholdCfg from dataDB/cache +// GetThresholdCfg retrieves a ThresholdCfg from dataDB/cache func (rs *RedisStorage) GetThresholdCfg(ID string, skipCache bool, transactionID string) (th *ThresholdCfg, err error) { key := utils.ThresholdCfgPrefix + ID if !skipCache { @@ -1655,7 +1644,7 @@ func (rs *RedisStorage) GetThresholdCfg(ID string, skipCache bool, transactionID return } -// SetStatsQueue stores a ThresholdCfg into DataDB +// SetThresholdCfg stores a ThresholdCfg into DataDB func (rs *RedisStorage) SetThresholdCfg(th *ThresholdCfg) (err error) { var result []byte result, err = rs.ms.Marshal(th) @@ -1665,7 +1654,7 @@ func (rs *RedisStorage) SetThresholdCfg(th *ThresholdCfg) (err error) { return rs.Cmd("SET", utils.ThresholdCfgPrefix+th.ID, result).Err } -// RemStatsQueue removes a ThresholdCfg from dataDB/cache +// RemThresholdCfg removes a ThresholdCfg from dataDB/cache func (rs *RedisStorage) RemThresholdCfg(ID string, transactionID string) (err error) { key := utils.ThresholdCfgPrefix + ID err = rs.Cmd("DEL", key).Err diff --git a/stats/service.go b/stats/service.go old mode 100644 new mode 100755 index 9eebe388d..a8ef867d0 --- a/stats/service.go +++ b/stats/service.go @@ -42,7 +42,7 @@ func NewStatService(dataDB engine.DataDB, ms engine.Marshaler, storeInterval tim } ss.stInsts = make(StatsInstances, len(sqPrfxs)) for i, prfx := range sqPrfxs { - sq, err := dataDB.GetStatsQueue(prfx[len(utils.StatsQueuePrefix):], false, utils.NonTransactional) + sq, err := dataDB.GetStatsQueue(prfx[len(utils.StatsQueuePrefix):]) if err != nil { return nil, err }