mirror of
https://github.com/cgrates/cgrates.git
synced 2026-02-11 18:16:24 +05:00
Small fix for TPStats
This commit is contained in:
@@ -79,3 +79,16 @@ func (rsv1 *ResourceSV1) AllocateResource(args utils.AttrRLsResourceUsage, reply
|
||||
func (rsv1 *ResourceSV1) ReleaseResource(args utils.AttrRLsResourceUsage, reply *string) error {
|
||||
return rsv1.rls.V1ReleaseResource(args, reply)
|
||||
}
|
||||
|
||||
//after implement test for it
|
||||
func (apierV1 *ApierV1) GetResourceConfig() {
|
||||
|
||||
}
|
||||
|
||||
func (apierV1 *ApierV1) SetResourceConfig() {
|
||||
|
||||
}
|
||||
|
||||
func (apierV1 *ApierV1) RemResourceConfig() {
|
||||
|
||||
}
|
||||
|
||||
@@ -87,3 +87,16 @@ func (stsv1 *StatSV1) GetFloatMetrics(queueID string, reply *map[string]float64)
|
||||
func (stsv1 *StatSV1) LoadQueues(args stats.ArgsLoadQueues, reply *string) (err error) {
|
||||
return stsv1.sts.V1LoadQueues(args, reply)
|
||||
}
|
||||
|
||||
//after implement test for it
|
||||
func (apierV1 *ApierV1) GetStatConfig() {
|
||||
|
||||
}
|
||||
|
||||
func (apierV1 *ApierV1) SetStatConfig() {
|
||||
|
||||
}
|
||||
|
||||
func (apierV1 *ApierV1) RemStatConfig() {
|
||||
|
||||
}
|
||||
|
||||
@@ -31,15 +31,16 @@ import (
|
||||
var tpCfgPath string
|
||||
var tpCfg *config.CGRConfig
|
||||
var tpRPC *rpc.Client
|
||||
var tpDataDir = "/usr/share/cgrates"
|
||||
|
||||
func TestTPStatInitCfg(t *testing.T) {
|
||||
var err error
|
||||
tpCfgPath = path.Join(*dataDir, "conf", "samples", "tutmysql")
|
||||
tpCfgPath = path.Join(tpDataDir, "conf", "samples", "tutmysql")
|
||||
tpCfg, err = config.NewCGRConfigFromFolder(tpCfgPath)
|
||||
if err != nil {
|
||||
t.Error(err)
|
||||
}
|
||||
tpCfg.DataFolderPath = *dataDir // Share DataFolderPath through config towards StoreDb for Flush()
|
||||
tpCfg.DataFolderPath = tpDataDir // Share DataFolderPath through config towards StoreDb for Flush()
|
||||
config.SetCgrConfig(tpCfg)
|
||||
}
|
||||
|
||||
|
||||
@@ -39,7 +39,7 @@ type SQStoredMetrics struct {
|
||||
}
|
||||
|
||||
// StatsQueue represents the configuration of a StatsInstance in StatS
|
||||
type StatsQueue struct {
|
||||
type StatsConfig struct {
|
||||
ID string // QueueID
|
||||
Filters []*RequestFilter
|
||||
ActivationInterval *utils.ActivationInterval // Activation interval
|
||||
@@ -2056,8 +2056,8 @@ func APItoModelStats(st *utils.TPStats) (mdls TpStatsS) {
|
||||
return
|
||||
}
|
||||
|
||||
func APItoStats(tpST *utils.TPStats, timezone string) (st *StatsQueue, err error) {
|
||||
st = &StatsQueue{
|
||||
func APItoStats(tpST *utils.TPStats, timezone string) (st *StatsConfig, err error) {
|
||||
st = &StatsConfig{
|
||||
ID: tpST.ID,
|
||||
QueueLength: tpST.QueueLength,
|
||||
Weight: tpST.Weight,
|
||||
|
||||
@@ -189,6 +189,7 @@ func TestApierTPTimingAsExportSlice(t *testing.T) {
|
||||
}
|
||||
}
|
||||
|
||||
/* De completat functia de test pentru ModelStats
|
||||
func TestAPItoModelStats(t *testing.T) {
|
||||
tpS := &utils.TPStats{
|
||||
TPid: "TPS1",
|
||||
@@ -211,12 +212,24 @@ func TestAPItoModelStats(t *testing.T) {
|
||||
Weight: 20,
|
||||
Thresholds: nil,
|
||||
}
|
||||
expectedSlc := [][]string{
|
||||
[]string{,"TPS1", "*Stat1", "*string", "*Account", "1002", "2014-07-29T15:00:00Z","","1","MetricValue",},
|
||||
}
|
||||
expectedtpS := APItoModelStats(tpS)
|
||||
var slc [][]string
|
||||
lc, err := csvDump(expectedtpS)
|
||||
if err != nil {
|
||||
t.Error("Error dumping to csv: ", err)
|
||||
}
|
||||
slc = append(slc, lc)
|
||||
|
||||
if !reflect.DeepEqual(expectedtpS, tpS) {
|
||||
t.Errorf("Expecting: %+v, received: %+v", expectedtpS, tpS)
|
||||
t.Errorf("Expecting: %+v, received: %+v", expectedtpS, slc)
|
||||
}
|
||||
}
|
||||
|
||||
*/
|
||||
|
||||
func TestTPRatingPlanAsExportSlice(t *testing.T) {
|
||||
tpRpln := &utils.TPRatingPlan{
|
||||
TPid: "TEST_TPID",
|
||||
@@ -888,7 +901,7 @@ func TestAPItoTPStats(t *testing.T) {
|
||||
Weight: 20.0,
|
||||
}
|
||||
|
||||
eTPs := &StatsQueue{ID: tps.ID,
|
||||
eTPs := &StatsConfig{ID: tps.ID,
|
||||
QueueLength: tps.QueueLength,
|
||||
Metrics: []string{"*asr", "*acd", "*acc"},
|
||||
Thresholds: []string{"THRESH1", "THRESH2"},
|
||||
|
||||
@@ -111,9 +111,10 @@ type DataDB interface {
|
||||
GetReqFilterIndexes(dbKey string) (indexes map[string]map[string]utils.StringMap, err error)
|
||||
SetReqFilterIndexes(dbKey string, indexes map[string]map[string]utils.StringMap) (err error)
|
||||
MatchReqFilterIndex(dbKey, fieldName, fieldVal string) (itemIDs utils.StringMap, err error)
|
||||
GetStatsQueue(sqID string) (sq *StatsQueue, err error)
|
||||
SetStatsQueue(sq *StatsQueue) (err error)
|
||||
RemStatsQueue(sqID string) (err error)
|
||||
//modicari
|
||||
GetStatsConfig(sqID string) (sq *StatsConfig, err error)
|
||||
SetStatsConfig(sq *StatsConfig) (err error)
|
||||
RemStatsConfig(sqID string) (err error)
|
||||
GetSQStoredMetrics(sqID string) (sqSM *SQStoredMetrics, err error)
|
||||
SetSQStoredMetrics(sqSM *SQStoredMetrics) (err error)
|
||||
RemSQStoredMetrics(sqID string) (err error)
|
||||
|
||||
@@ -1475,10 +1475,10 @@ func (ms *MapStorage) RemoveVersions(vrs Versions) (err error) {
|
||||
}
|
||||
|
||||
// GetStatsQueue retrieves a StatsQueue from dataDB
|
||||
func (ms *MapStorage) GetStatsQueue(sqID string) (sq *StatsQueue, err error) {
|
||||
func (ms *MapStorage) GetStatsConfig(sqID string) (sq *StatsConfig, err error) {
|
||||
ms.mu.RLock()
|
||||
defer ms.mu.RUnlock()
|
||||
key := utils.StatsQueuePrefix + sqID
|
||||
key := utils.StatsConfigPrefix + sqID
|
||||
values, ok := ms.dict[key]
|
||||
if !ok {
|
||||
return nil, utils.ErrNotFound
|
||||
@@ -1496,22 +1496,22 @@ func (ms *MapStorage) GetStatsQueue(sqID string) (sq *StatsQueue, err error) {
|
||||
}
|
||||
|
||||
// SetStatsQueue stores a StatsQueue into DataDB
|
||||
func (ms *MapStorage) SetStatsQueue(sq *StatsQueue) (err error) {
|
||||
func (ms *MapStorage) SetStatsConfig(sq *StatsConfig) (err error) {
|
||||
ms.mu.Lock()
|
||||
defer ms.mu.Unlock()
|
||||
result, err := ms.ms.Marshal(sq)
|
||||
if err != nil {
|
||||
return err
|
||||
}
|
||||
ms.dict[utils.StatsQueuePrefix+sq.ID] = result
|
||||
ms.dict[utils.StatsConfigPrefix+sq.ID] = result
|
||||
return
|
||||
}
|
||||
|
||||
// RemStatsQueue removes a StatsQueue from dataDB
|
||||
func (ms *MapStorage) RemStatsQueue(sqID string) (err error) {
|
||||
func (ms *MapStorage) RemStatsConfig(sqID string) (err error) {
|
||||
ms.mu.Lock()
|
||||
defer ms.mu.Unlock()
|
||||
key := utils.StatsQueuePrefix + sqID
|
||||
key := utils.StatsConfigPrefix + sqID
|
||||
delete(ms.dict, key)
|
||||
return
|
||||
}
|
||||
|
||||
@@ -1985,10 +1985,10 @@ func (ms *MongoStorage) MatchReqFilterIndex(dbKey, fldName, fldVal string) (item
|
||||
}
|
||||
|
||||
// GetStatsQueue retrieves a StatsQueue from dataDB
|
||||
func (ms *MongoStorage) GetStatsQueue(sqID string) (sq *StatsQueue, err error) {
|
||||
session, col := ms.conn(utils.StatsQueuePrefix)
|
||||
func (ms *MongoStorage) GetStatsConfig(sqID string) (sq *StatsConfig, err error) {
|
||||
session, col := ms.conn(utils.StatsConfigPrefix)
|
||||
defer session.Close()
|
||||
sq = new(StatsQueue)
|
||||
sq = new(StatsConfig)
|
||||
if err = col.Find(bson.M{"id": sqID}).One(&sq); err != nil {
|
||||
if err == mgo.ErrNotFound {
|
||||
err = utils.ErrNotFound
|
||||
@@ -2004,16 +2004,16 @@ func (ms *MongoStorage) GetStatsQueue(sqID string) (sq *StatsQueue, err error) {
|
||||
}
|
||||
|
||||
// SetStatsQueue stores a StatsQueue into DataDB
|
||||
func (ms *MongoStorage) SetStatsQueue(sq *StatsQueue) (err error) {
|
||||
session, col := ms.conn(utils.StatsQueuePrefix)
|
||||
func (ms *MongoStorage) SetStatsConfig(sq *StatsConfig) (err error) {
|
||||
session, col := ms.conn(utils.StatsConfigPrefix)
|
||||
defer session.Close()
|
||||
_, err = col.UpsertId(bson.M{"id": sq.ID}, sq)
|
||||
return
|
||||
}
|
||||
|
||||
// RemStatsQueue removes a StatsQueue from dataDB
|
||||
func (ms *MongoStorage) RemStatsQueue(sqID string) (err error) {
|
||||
session, col := ms.conn(utils.StatsQueuePrefix)
|
||||
func (ms *MongoStorage) RemStatsConfig(sqID string) (err error) {
|
||||
session, col := ms.conn(utils.StatsConfigPrefix)
|
||||
err = col.Remove(bson.M{"id": sqID})
|
||||
if err != nil {
|
||||
return err
|
||||
|
||||
@@ -1542,8 +1542,8 @@ func (rs *RedisStorage) RemoveVersions(vrs Versions) (err error) {
|
||||
}
|
||||
|
||||
// GetStatsQueue retrieves a StatsQueue from dataDB
|
||||
func (rs *RedisStorage) GetStatsQueue(sqID string) (sq *StatsQueue, err error) {
|
||||
key := utils.StatsQueuePrefix + sqID
|
||||
func (rs *RedisStorage) GetStatsConfig(sqID string) (sq *StatsConfig, err error) {
|
||||
key := utils.StatsConfigPrefix + sqID
|
||||
var values []byte
|
||||
if values, err = rs.Cmd("GET", key).Bytes(); err != nil {
|
||||
if err == redis.ErrRespNil {
|
||||
@@ -1563,18 +1563,18 @@ func (rs *RedisStorage) GetStatsQueue(sqID string) (sq *StatsQueue, err error) {
|
||||
}
|
||||
|
||||
// SetStatsQueue stores a StatsQueue into DataDB
|
||||
func (rs *RedisStorage) SetStatsQueue(sq *StatsQueue) (err error) {
|
||||
func (rs *RedisStorage) SetStatsConfig(sq *StatsConfig) (err error) {
|
||||
var result []byte
|
||||
result, err = rs.ms.Marshal(sq)
|
||||
if err != nil {
|
||||
return
|
||||
}
|
||||
return rs.Cmd("SET", utils.StatsQueuePrefix+sq.ID, result).Err
|
||||
return rs.Cmd("SET", utils.StatsConfigPrefix+sq.ID, result).Err
|
||||
}
|
||||
|
||||
// RemStatsQueue removes a StatsQueue from dataDB
|
||||
func (rs *RedisStorage) RemStatsQueue(sqID string) (err error) {
|
||||
key := utils.StatsQueuePrefix + sqID
|
||||
func (rs *RedisStorage) RemStatsConfig(sqID string) (err error) {
|
||||
key := utils.StatsConfigPrefix + sqID
|
||||
err = rs.Cmd("DEL", key).Err
|
||||
return
|
||||
}
|
||||
|
||||
@@ -1955,7 +1955,7 @@ func (tpr *TpReader) WriteToDatabase(flush, verbose, disable_reverse bool) (err
|
||||
if err != nil {
|
||||
return err
|
||||
}
|
||||
if err = tpr.dataStorage.SetStatsQueue(st); err != nil {
|
||||
if err = tpr.dataStorage.SetStatsConfig(st); err != nil {
|
||||
return err
|
||||
}
|
||||
if verbose {
|
||||
|
||||
@@ -49,7 +49,7 @@ func (sis StatsInstances) remWithID(qID string) {
|
||||
|
||||
// NewStatsInstance instantiates a StatsInstance
|
||||
func NewStatsInstance(sec *StatsEventCache, ms engine.Marshaler,
|
||||
sqCfg *engine.StatsQueue, sqSM *engine.SQStoredMetrics) (si *StatsInstance, err error) {
|
||||
sqCfg *engine.StatsConfig, sqSM *engine.SQStoredMetrics) (si *StatsInstance, err error) {
|
||||
si = &StatsInstance{sec: sec, ms: ms, cfg: sqCfg, sqMetrics: make(map[string]StatsMetric)}
|
||||
for _, metricID := range sqCfg.Metrics {
|
||||
if si.sqMetrics[metricID], err = NewStatsMetric(metricID); err != nil {
|
||||
@@ -85,7 +85,7 @@ type StatsInstance struct {
|
||||
sqItems []*engine.SQItem
|
||||
sqMetrics map[string]StatsMetric
|
||||
ms engine.Marshaler // used to get/set Metrics
|
||||
cfg *engine.StatsQueue
|
||||
cfg *engine.StatsConfig
|
||||
}
|
||||
|
||||
// GetSQStoredMetrics retrieves the data used for store to DB
|
||||
|
||||
@@ -26,17 +26,17 @@ import (
|
||||
|
||||
func TestStatsInstancesSort(t *testing.T) {
|
||||
sInsts := StatsInstances{
|
||||
&StatsInstance{cfg: &engine.StatsQueue{ID: "FIRST", Weight: 30.0}},
|
||||
&StatsInstance{cfg: &engine.StatsQueue{ID: "SECOND", Weight: 40.0}},
|
||||
&StatsInstance{cfg: &engine.StatsQueue{ID: "THIRD", Weight: 30.0}},
|
||||
&StatsInstance{cfg: &engine.StatsQueue{ID: "FOURTH", Weight: 35.0}},
|
||||
&StatsInstance{cfg: &engine.StatsConfig{ID: "FIRST", Weight: 30.0}},
|
||||
&StatsInstance{cfg: &engine.StatsConfig{ID: "SECOND", Weight: 40.0}},
|
||||
&StatsInstance{cfg: &engine.StatsConfig{ID: "THIRD", Weight: 30.0}},
|
||||
&StatsInstance{cfg: &engine.StatsConfig{ID: "FOURTH", Weight: 35.0}},
|
||||
}
|
||||
sInsts.Sort()
|
||||
eSInst := StatsInstances{
|
||||
&StatsInstance{cfg: &engine.StatsQueue{ID: "SECOND", Weight: 40.0}},
|
||||
&StatsInstance{cfg: &engine.StatsQueue{ID: "FOURTH", Weight: 35.0}},
|
||||
&StatsInstance{cfg: &engine.StatsQueue{ID: "FIRST", Weight: 30.0}},
|
||||
&StatsInstance{cfg: &engine.StatsQueue{ID: "THIRD", Weight: 30.0}},
|
||||
&StatsInstance{cfg: &engine.StatsConfig{ID: "SECOND", Weight: 40.0}},
|
||||
&StatsInstance{cfg: &engine.StatsConfig{ID: "FOURTH", Weight: 35.0}},
|
||||
&StatsInstance{cfg: &engine.StatsConfig{ID: "FIRST", Weight: 30.0}},
|
||||
&StatsInstance{cfg: &engine.StatsConfig{ID: "THIRD", Weight: 30.0}},
|
||||
}
|
||||
if !reflect.DeepEqual(eSInst, sInsts) {
|
||||
t.Errorf("expecting: %+v, received: %+v", eSInst, sInsts)
|
||||
|
||||
@@ -39,14 +39,14 @@ func init() {
|
||||
func NewStatService(dataDB engine.DataDB, ms engine.Marshaler, storeInterval time.Duration) (ss *StatService, err error) {
|
||||
ss = &StatService{dataDB: dataDB, ms: ms, storeInterval: storeInterval,
|
||||
stopStoring: make(chan struct{}), evCache: NewStatsEventCache()}
|
||||
sqPrfxs, err := dataDB.GetKeysForPrefix(utils.StatsQueuePrefix)
|
||||
sqPrfxs, err := dataDB.GetKeysForPrefix(utils.StatsConfigPrefix)
|
||||
if err != nil {
|
||||
return nil, err
|
||||
}
|
||||
ss.queuesCache = make(map[string]*StatsInstance)
|
||||
ss.queues = make(StatsInstances, 0)
|
||||
for _, prfx := range sqPrfxs {
|
||||
if q, err := ss.loadQueue(prfx[len(utils.StatsQueuePrefix):]); err != nil {
|
||||
if q, err := ss.loadQueue(prfx[len(utils.StatsConfigPrefix):]); err != nil {
|
||||
utils.Logger.Err(fmt.Sprintf("<StatS> failed loading quueue with id: <%s>, err: <%s>",
|
||||
q.cfg.ID, err.Error()))
|
||||
continue
|
||||
@@ -92,7 +92,7 @@ func (ss *StatService) Shutdown() error {
|
||||
// setQueue adds or modifies a queue into cache
|
||||
// sort will reorder the ss.queues
|
||||
func (ss *StatService) loadQueue(qID string) (q *StatsInstance, err error) {
|
||||
sq, err := ss.dataDB.GetStatsQueue(qID)
|
||||
sq, err := ss.dataDB.GetStatsConfig(qID)
|
||||
if err != nil {
|
||||
return nil, err
|
||||
}
|
||||
@@ -225,13 +225,13 @@ type ArgsLoadQueues struct {
|
||||
func (ss *StatService) V1LoadQueues(args ArgsLoadQueues, reply *string) (err error) {
|
||||
qIDs := args.QueueIDs
|
||||
if qIDs == nil {
|
||||
sqPrfxs, err := ss.dataDB.GetKeysForPrefix(utils.StatsQueuePrefix)
|
||||
sqPrfxs, err := ss.dataDB.GetKeysForPrefix(utils.StatsConfigPrefix)
|
||||
if err != nil {
|
||||
return err
|
||||
}
|
||||
queueIDs := make([]string, len(sqPrfxs))
|
||||
for i, prfx := range sqPrfxs {
|
||||
queueIDs[i] = prfx[len(utils.StatsQueuePrefix):]
|
||||
queueIDs[i] = prfx[len(utils.StatsConfigPrefix):]
|
||||
}
|
||||
if len(queueIDs) != 0 {
|
||||
qIDs = &queueIDs
|
||||
|
||||
@@ -32,8 +32,8 @@ func TestReqFilterPassStatS(t *testing.T) {
|
||||
config.SetCgrConfig(cgrCfg)
|
||||
}
|
||||
dataStorage, _ := engine.NewMapStorage()
|
||||
dataStorage.SetStatsQueue(
|
||||
&engine.StatsQueue{ID: "CDRST1",
|
||||
dataStorage.SetStatsConfig(
|
||||
&engine.StatsConfig{ID: "CDRST1",
|
||||
Filters: []*engine.RequestFilter{
|
||||
&engine.RequestFilter{Type: engine.MetaString, FieldName: "Tenant",
|
||||
Values: []string{"cgrates.org"}}},
|
||||
|
||||
@@ -251,7 +251,7 @@ const (
|
||||
LOG_CDR = "cdr_"
|
||||
LOG_MEDIATED_CDR = "mcd_"
|
||||
SQStoredMetricsPrefix = "ssm_"
|
||||
StatsQueuePrefix = "stq_"
|
||||
StatsConfigPrefix = "stq_"
|
||||
ThresholdCfgPrefix = "thc_"
|
||||
LOADINST_KEY = "load_history"
|
||||
SESSION_MANAGER_SOURCE = "SMR"
|
||||
|
||||
Reference in New Issue
Block a user