mirror of
https://github.com/cgrates/cgrates.git
synced 2026-02-11 18:16:24 +05:00
Merge branch 'master' of https://github.com/cgrates/cgrates
This commit is contained in:
@@ -93,11 +93,11 @@ type AttrGetStatsCfg struct {
|
||||
}
|
||||
|
||||
//GetStatConfig returns a stat configuration
|
||||
func (apierV1 *ApierV1) GetStatConfig(attr AttrGetStatsCfg, reply *engine.StatsConfig) error {
|
||||
func (apierV1 *ApierV1) GetStatQueueProfile(attr AttrGetStatsCfg, reply *engine.StatQueueProfile) error {
|
||||
if missing := utils.MissingStructFields(&attr, []string{"ID"}); len(missing) != 0 { //Params missing
|
||||
return utils.NewErrMandatoryIeMissing(missing...)
|
||||
}
|
||||
if sCfg, err := apierV1.DataDB.GetStatsConfig(attr.ID); err != nil {
|
||||
if sCfg, err := apierV1.DataDB.GetStatQueueProfile(attr.ID); err != nil {
|
||||
if err.Error() != utils.ErrNotFound.Error() {
|
||||
err = utils.NewErrServerError(err)
|
||||
}
|
||||
@@ -109,11 +109,11 @@ func (apierV1 *ApierV1) GetStatConfig(attr AttrGetStatsCfg, reply *engine.StatsC
|
||||
}
|
||||
|
||||
//SetStatConfig add a new stat configuration
|
||||
func (apierV1 *ApierV1) SetStatConfig(attr *engine.StatsConfig, reply *string) error {
|
||||
func (apierV1 *ApierV1) SetStatQueueProfile(attr *engine.StatQueueProfile, reply *string) error {
|
||||
if missing := utils.MissingStructFields(attr, []string{"ID"}); len(missing) != 0 {
|
||||
return utils.NewErrMandatoryIeMissing(missing...)
|
||||
}
|
||||
if err := apierV1.DataDB.SetStatsConfig(attr); err != nil {
|
||||
if err := apierV1.DataDB.SetStatQueueProfile(attr); err != nil {
|
||||
return utils.APIErrorHandler(err)
|
||||
}
|
||||
*reply = utils.OK
|
||||
@@ -122,11 +122,11 @@ func (apierV1 *ApierV1) SetStatConfig(attr *engine.StatsConfig, reply *string) e
|
||||
}
|
||||
|
||||
//Remove a specific stat configuration
|
||||
func (apierV1 *ApierV1) RemStatConfig(attrs AttrGetStatsCfg, reply *string) error {
|
||||
func (apierV1 *ApierV1) RemStatQueueProfile(attrs AttrGetStatsCfg, reply *string) error {
|
||||
if missing := utils.MissingStructFields(&attrs, []string{"ID"}); len(missing) != 0 { //Params missing
|
||||
return utils.NewErrMandatoryIeMissing(missing...)
|
||||
}
|
||||
if err := apierV1.DataDB.RemStatsConfig(attrs.ID); err != nil {
|
||||
if err := apierV1.DataDB.RemStatQueueProfile(attrs.ID); err != nil {
|
||||
if err.Error() != utils.ErrNotFound.Error() {
|
||||
err = utils.NewErrServerError(err)
|
||||
}
|
||||
|
||||
@@ -37,7 +37,7 @@ var (
|
||||
stsV1CfgPath string
|
||||
stsV1Cfg *config.CGRConfig
|
||||
stsV1Rpc *rpc.Client
|
||||
statConfig *engine.StatsConfig
|
||||
statConfig *engine.StatQueueProfile
|
||||
stsV1ConfDIR string //run tests for specific configuration
|
||||
statsDelay int
|
||||
)
|
||||
@@ -66,13 +66,13 @@ var sTestsStatSV1 = []func(t *testing.T){
|
||||
testV1STSFromFolder,
|
||||
testV1STSGetStats,
|
||||
testV1STSProcessEvent,
|
||||
testV1STSGetStatConfigBeforeSet,
|
||||
testV1STSSetStatConfig,
|
||||
testV1STSGetStatAfterSet,
|
||||
testV1STSUpdateStatConfig,
|
||||
testV1STSGetStatAfterUpdate,
|
||||
testV1STSRemoveStatConfig,
|
||||
testV1STSGetStatConfigAfterRemove,
|
||||
testV1STSGetStatQueueProfileBeforeSet,
|
||||
testV1STSSetStatQueueProfile,
|
||||
testV1STSGetStatQueueProfileAfterSet,
|
||||
testV1STSUpdateStatQueueProfile,
|
||||
testV1STSGetStatQueueProfileAfterUpdate,
|
||||
testV1STSRemoveStatQueueProfile,
|
||||
testV1STSGetStatQueueProfileAfterRemove,
|
||||
testV1STSStopEngine,
|
||||
}
|
||||
|
||||
@@ -209,15 +209,15 @@ func testV1STSProcessEvent(t *testing.T) {
|
||||
}
|
||||
}
|
||||
|
||||
func testV1STSGetStatConfigBeforeSet(t *testing.T) {
|
||||
var reply *engine.StatsConfig
|
||||
if err := stsV1Rpc.Call("ApierV1.GetStatConfig", &AttrGetStatsCfg{ID: "SCFG1"}, &reply); err == nil || err.Error() != utils.ErrNotFound.Error() {
|
||||
func testV1STSGetStatQueueProfileBeforeSet(t *testing.T) {
|
||||
var reply *engine.StatQueueProfile
|
||||
if err := stsV1Rpc.Call("ApierV1.GetStatQueueProfile", &AttrGetStatsCfg{ID: "SCFG1"}, &reply); err == nil || err.Error() != utils.ErrNotFound.Error() {
|
||||
t.Error(err)
|
||||
}
|
||||
}
|
||||
|
||||
func testV1STSSetStatConfig(t *testing.T) {
|
||||
statConfig = &engine.StatsConfig{
|
||||
func testV1STSSetStatQueueProfile(t *testing.T) {
|
||||
statConfig = &engine.StatQueueProfile{
|
||||
ID: "SCFG1",
|
||||
Filters: []*engine.RequestFilter{
|
||||
&engine.RequestFilter{
|
||||
@@ -240,23 +240,23 @@ func testV1STSSetStatConfig(t *testing.T) {
|
||||
Weight: 20,
|
||||
}
|
||||
var result string
|
||||
if err := stsV1Rpc.Call("ApierV1.SetStatConfig", statConfig, &result); err != nil {
|
||||
if err := stsV1Rpc.Call("ApierV1.SetStatQueueProfile", statConfig, &result); err != nil {
|
||||
t.Error(err)
|
||||
} else if result != utils.OK {
|
||||
t.Error("Unexpected reply returned", result)
|
||||
}
|
||||
}
|
||||
|
||||
func testV1STSGetStatAfterSet(t *testing.T) {
|
||||
var reply *engine.StatsConfig
|
||||
if err := stsV1Rpc.Call("ApierV1.GetStatConfig", &AttrGetStatsCfg{ID: "SCFG1"}, &reply); err != nil {
|
||||
func testV1STSGetStatQueueProfileAfterSet(t *testing.T) {
|
||||
var reply *engine.StatQueueProfile
|
||||
if err := stsV1Rpc.Call("ApierV1.GetStatQueueProfile", &AttrGetStatsCfg{ID: "SCFG1"}, &reply); err != nil {
|
||||
t.Error(err)
|
||||
} else if !reflect.DeepEqual(statConfig, reply) {
|
||||
t.Errorf("Expecting: %+v, received: %+v", statConfig, reply)
|
||||
}
|
||||
}
|
||||
|
||||
func testV1STSUpdateStatConfig(t *testing.T) {
|
||||
func testV1STSUpdateStatQueueProfile(t *testing.T) {
|
||||
var result string
|
||||
statConfig.Filters = []*engine.RequestFilter{
|
||||
&engine.RequestFilter{
|
||||
@@ -275,34 +275,34 @@ func testV1STSUpdateStatConfig(t *testing.T) {
|
||||
Values: []string{"10", "20"},
|
||||
},
|
||||
}
|
||||
if err := stsV1Rpc.Call("ApierV1.SetStatConfig", statConfig, &result); err != nil {
|
||||
if err := stsV1Rpc.Call("ApierV1.SetStatQueueProfile", statConfig, &result); err != nil {
|
||||
t.Error(err)
|
||||
} else if result != utils.OK {
|
||||
t.Error("Unexpected reply returned", result)
|
||||
}
|
||||
}
|
||||
|
||||
func testV1STSGetStatAfterUpdate(t *testing.T) {
|
||||
var reply *engine.StatsConfig
|
||||
if err := stsV1Rpc.Call("ApierV1.GetStatConfig", &AttrGetStatsCfg{ID: "SCFG1"}, &reply); err != nil {
|
||||
func testV1STSGetStatQueueProfileAfterUpdate(t *testing.T) {
|
||||
var reply *engine.StatQueueProfile
|
||||
if err := stsV1Rpc.Call("ApierV1.GetStatQueueProfile", &AttrGetStatsCfg{ID: "SCFG1"}, &reply); err != nil {
|
||||
t.Error(err)
|
||||
} else if !reflect.DeepEqual(statConfig, reply) {
|
||||
t.Errorf("Expecting: %+v, received: %+v", statConfig, reply)
|
||||
}
|
||||
}
|
||||
|
||||
func testV1STSRemoveStatConfig(t *testing.T) {
|
||||
func testV1STSRemoveStatQueueProfile(t *testing.T) {
|
||||
var resp string
|
||||
if err := stsV1Rpc.Call("ApierV1.RemStatConfig", &AttrGetStatsCfg{ID: "SCFG1"}, &resp); err != nil {
|
||||
if err := stsV1Rpc.Call("ApierV1.RemStatQueueProfile", &AttrGetStatsCfg{ID: "SCFG1"}, &resp); err != nil {
|
||||
t.Error(err)
|
||||
} else if resp != utils.OK {
|
||||
t.Error("Unexpected reply returned", resp)
|
||||
}
|
||||
}
|
||||
|
||||
func testV1STSGetStatConfigAfterRemove(t *testing.T) {
|
||||
var reply *engine.StatsConfig
|
||||
if err := stsV1Rpc.Call("ApierV1.GetStatConfig", &AttrGetStatsCfg{ID: "SCFG1"}, &reply); err == nil || err.Error() != utils.ErrNotFound.Error() {
|
||||
func testV1STSGetStatQueueProfileAfterRemove(t *testing.T) {
|
||||
var reply *engine.StatQueueProfile
|
||||
if err := stsV1Rpc.Call("ApierV1.GetStatQueueProfile", &AttrGetStatsCfg{ID: "SCFG1"}, &reply); err == nil || err.Error() != utils.ErrNotFound.Error() {
|
||||
t.Error(err)
|
||||
}
|
||||
}
|
||||
|
||||
@@ -87,7 +87,7 @@ func testTPStatsInitCfg(t *testing.T) {
|
||||
tpStatCfg.DataFolderPath = tpStatDataDir // Share DataFolderPath through config towards StoreDb for Flush()
|
||||
config.SetCgrConfig(tpStatCfg)
|
||||
switch tpStatConfigDIR {
|
||||
case "tutmongo": // Mongo needs more time to reset db, need to investigate
|
||||
case "tutmongo": // Mongo needs more time to reset db
|
||||
tpStatDelay = 4000
|
||||
default:
|
||||
tpStatDelay = 1000
|
||||
|
||||
@@ -39,7 +39,7 @@ type SQStoredMetrics struct {
|
||||
}
|
||||
|
||||
// StatsConfig represents the configuration of a StatsInstance in StatS
|
||||
type StatsConfig struct {
|
||||
type StatQueueProfile struct {
|
||||
ID string // QueueID
|
||||
Filters []*RequestFilter
|
||||
ActivationInterval *utils.ActivationInterval // Activation interval
|
||||
|
||||
@@ -324,7 +324,7 @@ func TestLoaderITWriteToDatabase(t *testing.T) {
|
||||
}
|
||||
|
||||
for k, st := range loader.stats {
|
||||
rcv, err := loader.dataStorage.GetStatsConfig(k)
|
||||
rcv, err := loader.dataStorage.GetStatQueueProfile(k)
|
||||
if err != nil {
|
||||
t.Error("Failed GetStatsQueue: ", err.Error())
|
||||
}
|
||||
|
||||
@@ -2069,8 +2069,8 @@ func APItoModelStats(st *utils.TPStats) (mdls TpStatsS) {
|
||||
return
|
||||
}
|
||||
|
||||
func APItoStats(tpST *utils.TPStats, timezone string) (st *StatsConfig, err error) {
|
||||
st = &StatsConfig{
|
||||
func APItoStats(tpST *utils.TPStats, timezone string) (st *StatQueueProfile, err error) {
|
||||
st = &StatQueueProfile{
|
||||
ID: tpST.ID,
|
||||
QueueLength: tpST.QueueLength,
|
||||
Weight: tpST.Weight,
|
||||
|
||||
@@ -901,7 +901,7 @@ func TestAPItoTPStats(t *testing.T) {
|
||||
Weight: 20.0,
|
||||
}
|
||||
|
||||
eTPs := &StatsConfig{ID: tps.ID,
|
||||
eTPs := &StatQueueProfile{ID: tps.ID,
|
||||
QueueLength: tps.QueueLength,
|
||||
Metrics: []string{"*asr", "*acd", "*acc"},
|
||||
Thresholds: []string{"THRESH1", "THRESH2"},
|
||||
|
||||
@@ -90,7 +90,7 @@ var sTestsOnStorIT = []func(t *testing.T){
|
||||
testOnStorITCRUDHistory,
|
||||
testOnStorITCRUDStructVersion,
|
||||
testOnStorITCRUDSQStoredMetrics,
|
||||
testOnStorITCRUDStats,
|
||||
testOnStorITCRUDStatQueueProfile,
|
||||
testOnStorITCRUDThresholdCfg,
|
||||
}
|
||||
|
||||
@@ -1973,9 +1973,9 @@ func testOnStorITCRUDSQStoredMetrics(t *testing.T) {
|
||||
}
|
||||
}
|
||||
|
||||
func testOnStorITCRUDStats(t *testing.T) {
|
||||
func testOnStorITCRUDStatQueueProfile(t *testing.T) {
|
||||
timeTTL := time.Duration(0 * time.Second)
|
||||
sq := &StatsConfig{
|
||||
sq := &StatQueueProfile{
|
||||
ID: "test",
|
||||
ActivationInterval: &utils.ActivationInterval{},
|
||||
Filters: []*RequestFilter{},
|
||||
@@ -1985,33 +1985,33 @@ func testOnStorITCRUDStats(t *testing.T) {
|
||||
Store: true,
|
||||
Thresholds: []string{},
|
||||
}
|
||||
if _, rcvErr := onStor.GetStatsConfig(sq.ID); rcvErr != utils.ErrNotFound {
|
||||
if _, rcvErr := onStor.GetStatQueueProfile(sq.ID); rcvErr != utils.ErrNotFound {
|
||||
t.Error(rcvErr)
|
||||
}
|
||||
if _, ok := cache.Get(utils.StatsConfigPrefix + sq.ID); ok != false {
|
||||
if _, ok := cache.Get(utils.StatQueueProfilePrefix + sq.ID); ok != false {
|
||||
t.Error("Should not be in cache")
|
||||
}
|
||||
if err := onStor.SetStatsConfig(sq); err != nil {
|
||||
if err := onStor.SetStatQueueProfile(sq); err != nil {
|
||||
t.Error(err)
|
||||
}
|
||||
if _, ok := cache.Get(utils.StatsConfigPrefix + sq.ID); ok != false {
|
||||
if _, ok := cache.Get(utils.StatQueueProfilePrefix + sq.ID); ok != false {
|
||||
t.Error("Should not be in cache")
|
||||
}
|
||||
if rcv, err := onStor.GetStatsConfig(sq.ID); err != nil {
|
||||
if rcv, err := onStor.GetStatQueueProfile(sq.ID); err != nil {
|
||||
t.Error(err)
|
||||
} else if !reflect.DeepEqual(sq, rcv) {
|
||||
t.Errorf("Expecting: %v, received: %v", sq, rcv)
|
||||
}
|
||||
if _, ok := cache.Get(utils.StatsConfigPrefix + sq.ID); ok != false {
|
||||
if _, ok := cache.Get(utils.StatQueueProfilePrefix + sq.ID); ok != false {
|
||||
t.Error("Should not be in cache")
|
||||
}
|
||||
if err := onStor.RemStatsConfig(sq.ID); err != nil {
|
||||
if err := onStor.RemStatQueueProfile(sq.ID); err != nil {
|
||||
t.Error(err)
|
||||
}
|
||||
if _, ok := cache.Get(utils.StatsConfigPrefix + sq.ID); ok != false {
|
||||
if _, ok := cache.Get(utils.StatQueueProfilePrefix + sq.ID); ok != false {
|
||||
t.Error("Should not be in cache")
|
||||
}
|
||||
if _, rcvErr := onStor.GetStatsConfig(sq.ID); rcvErr != utils.ErrNotFound {
|
||||
if _, rcvErr := onStor.GetStatQueueProfile(sq.ID); rcvErr != utils.ErrNotFound {
|
||||
t.Error(rcvErr)
|
||||
}
|
||||
}
|
||||
|
||||
@@ -112,9 +112,9 @@ type DataDB interface {
|
||||
GetReqFilterIndexes(dbKey string) (indexes map[string]map[string]utils.StringMap, err error)
|
||||
SetReqFilterIndexes(dbKey string, indexes map[string]map[string]utils.StringMap) (err error)
|
||||
MatchReqFilterIndex(dbKey, fieldName, fieldVal string) (itemIDs utils.StringMap, err error)
|
||||
GetStatsConfig(sqID string) (sq *StatsConfig, err error)
|
||||
SetStatsConfig(sq *StatsConfig) (err error)
|
||||
RemStatsConfig(sqID string) (err error)
|
||||
GetStatQueueProfile(sqID string) (sq *StatQueueProfile, err error)
|
||||
SetStatQueueProfile(sq *StatQueueProfile) (err error)
|
||||
RemStatQueueProfile(sqID string) (err error)
|
||||
GetSQStoredMetrics(sqID string) (sqSM *SQStoredMetrics, err error)
|
||||
SetSQStoredMetrics(sqSM *SQStoredMetrics) (err error)
|
||||
RemSQStoredMetrics(sqID string) (err error)
|
||||
|
||||
@@ -1472,10 +1472,10 @@ func (ms *MapStorage) MatchReqFilterIndex(dbKey, fldName, fldVal string) (itemID
|
||||
}
|
||||
|
||||
// GetStatsQueue retrieves a StatsQueue from dataDB
|
||||
func (ms *MapStorage) GetStatsConfig(sqID string) (scf *StatsConfig, err error) {
|
||||
func (ms *MapStorage) GetStatQueueProfile(sqID string) (scf *StatQueueProfile, err error) {
|
||||
ms.mu.RLock()
|
||||
defer ms.mu.RUnlock()
|
||||
key := utils.StatsConfigPrefix + sqID
|
||||
key := utils.StatQueueProfilePrefix + sqID
|
||||
values, ok := ms.dict[key]
|
||||
if !ok {
|
||||
return nil, utils.ErrNotFound
|
||||
@@ -1493,22 +1493,22 @@ func (ms *MapStorage) GetStatsConfig(sqID string) (scf *StatsConfig, err error)
|
||||
}
|
||||
|
||||
// SetStatsQueue stores a StatsQueue into DataDB
|
||||
func (ms *MapStorage) SetStatsConfig(scf *StatsConfig) (err error) {
|
||||
func (ms *MapStorage) SetStatQueueProfile(scf *StatQueueProfile) (err error) {
|
||||
ms.mu.Lock()
|
||||
defer ms.mu.Unlock()
|
||||
result, err := ms.ms.Marshal(scf)
|
||||
if err != nil {
|
||||
return err
|
||||
}
|
||||
ms.dict[utils.StatsConfigPrefix+scf.ID] = result
|
||||
ms.dict[utils.StatQueueProfilePrefix+scf.ID] = result
|
||||
return
|
||||
}
|
||||
|
||||
// RemStatsQueue removes a StatsQueue from dataDB
|
||||
func (ms *MapStorage) RemStatsConfig(scfID string) (err error) {
|
||||
func (ms *MapStorage) RemStatQueueProfile(scfID string) (err error) {
|
||||
ms.mu.Lock()
|
||||
defer ms.mu.Unlock()
|
||||
key := utils.StatsConfigPrefix + scfID
|
||||
key := utils.StatQueueProfilePrefix + scfID
|
||||
delete(ms.dict, key)
|
||||
return
|
||||
}
|
||||
|
||||
@@ -57,7 +57,7 @@ const (
|
||||
colLht = "load_history"
|
||||
colVer = "versions"
|
||||
colRsP = "resource_profiles"
|
||||
colSts = "stats"
|
||||
colSqp = "stat_queue_profiles"
|
||||
colRFI = "request_filter_indexes"
|
||||
colTmg = "timings"
|
||||
colRes = "resources"
|
||||
@@ -325,7 +325,7 @@ func (ms *MongoStorage) getColNameForPrefix(prefix string) (name string, ok bool
|
||||
utils.LOADINST_KEY: colLht,
|
||||
utils.VERSION_PREFIX: colVer,
|
||||
utils.ResourceProfilesPrefix: colRsP,
|
||||
utils.StatsPrefix: colSts,
|
||||
utils.StatsPrefix: colStq,
|
||||
utils.TimingsPrefix: colTmg,
|
||||
utils.ResourcesPrefix: colRes,
|
||||
}
|
||||
@@ -631,14 +631,14 @@ func (ms *MongoStorage) GetKeysForPrefix(prefix string) (result []string, err er
|
||||
result = append(result, utils.ResourcesPrefix+idResult.Id)
|
||||
}
|
||||
case utils.StatsPrefix:
|
||||
iter := db.C(colSts).Find(bson.M{"id": bson.M{"$regex": bson.RegEx{Pattern: subject}}}).Select(bson.M{"id": 1}).Iter()
|
||||
iter := db.C(colStq).Find(bson.M{"id": bson.M{"$regex": bson.RegEx{Pattern: subject}}}).Select(bson.M{"id": 1}).Iter()
|
||||
for iter.Next(&idResult) {
|
||||
result = append(result, utils.StatsPrefix+idResult.Id)
|
||||
}
|
||||
case utils.StatsConfigPrefix:
|
||||
iter := db.C(colStq).Find(bson.M{"id": bson.M{"$regex": bson.RegEx{Pattern: subject}}}).Select(bson.M{"id": 1}).Iter()
|
||||
case utils.StatQueueProfilePrefix:
|
||||
iter := db.C(colSqp).Find(bson.M{"id": bson.M{"$regex": bson.RegEx{Pattern: subject}}}).Select(bson.M{"id": 1}).Iter()
|
||||
for iter.Next(&idResult) {
|
||||
result = append(result, utils.StatsConfigPrefix+idResult.Id)
|
||||
result = append(result, utils.StatQueueProfilePrefix+idResult.Id)
|
||||
}
|
||||
case utils.AccountActionPlansPrefix:
|
||||
iter := db.C(colAAp).Find(bson.M{"key": bson.M{"$regex": bson.RegEx{Pattern: subject}}}).Select(bson.M{"id": 1}).Iter()
|
||||
@@ -2008,10 +2008,10 @@ func (ms *MongoStorage) MatchReqFilterIndex(dbKey, fldName, fldVal string) (item
|
||||
}
|
||||
|
||||
// GetStatsQueue retrieves a StatsQueue from dataDB
|
||||
func (ms *MongoStorage) GetStatsConfig(sqID string) (sq *StatsConfig, err error) {
|
||||
session, col := ms.conn(utils.StatsConfigPrefix)
|
||||
func (ms *MongoStorage) GetStatQueueProfile(sqID string) (sq *StatQueueProfile, err error) {
|
||||
session, col := ms.conn(utils.StatQueueProfilePrefix)
|
||||
defer session.Close()
|
||||
sq = new(StatsConfig)
|
||||
sq = new(StatQueueProfile)
|
||||
if err = col.Find(bson.M{"id": sqID}).One(&sq); err != nil {
|
||||
if err == mgo.ErrNotFound {
|
||||
err = utils.ErrNotFound
|
||||
@@ -2027,16 +2027,16 @@ func (ms *MongoStorage) GetStatsConfig(sqID string) (sq *StatsConfig, err error)
|
||||
}
|
||||
|
||||
// SetStatsQueue stores a StatsQueue into DataDB
|
||||
func (ms *MongoStorage) SetStatsConfig(sq *StatsConfig) (err error) {
|
||||
session, col := ms.conn(utils.StatsConfigPrefix)
|
||||
func (ms *MongoStorage) SetStatQueueProfile(sq *StatQueueProfile) (err error) {
|
||||
session, col := ms.conn(utils.StatQueueProfilePrefix)
|
||||
defer session.Close()
|
||||
_, err = col.UpsertId(bson.M{"id": sq.ID}, sq)
|
||||
return
|
||||
}
|
||||
|
||||
// RemStatsQueue removes a StatsQueue from dataDB
|
||||
func (ms *MongoStorage) RemStatsConfig(sqID string) (err error) {
|
||||
session, col := ms.conn(utils.StatsConfigPrefix)
|
||||
func (ms *MongoStorage) RemStatQueueProfile(sqID string) (err error) {
|
||||
session, col := ms.conn(utils.StatQueueProfilePrefix)
|
||||
err = col.Remove(bson.M{"id": sqID})
|
||||
if err != nil {
|
||||
return err
|
||||
|
||||
@@ -1583,8 +1583,8 @@ func (rs *RedisStorage) RemoveVersions(vrs Versions) (err error) {
|
||||
}
|
||||
|
||||
// GetStatsConfig retrieves a StatsConfig from dataDB
|
||||
func (rs *RedisStorage) GetStatsConfig(sqID string) (sq *StatsConfig, err error) {
|
||||
key := utils.StatsConfigPrefix + sqID
|
||||
func (rs *RedisStorage) GetStatQueueProfile(sqID string) (sq *StatQueueProfile, err error) {
|
||||
key := utils.StatQueueProfilePrefix + sqID
|
||||
var values []byte
|
||||
if values, err = rs.Cmd("GET", key).Bytes(); err != nil {
|
||||
if err == redis.ErrRespNil {
|
||||
@@ -1604,18 +1604,18 @@ func (rs *RedisStorage) GetStatsConfig(sqID string) (sq *StatsConfig, err error)
|
||||
}
|
||||
|
||||
// SetStatsQueue stores a StatsQueue into DataDB
|
||||
func (rs *RedisStorage) SetStatsConfig(sq *StatsConfig) (err error) {
|
||||
func (rs *RedisStorage) SetStatQueueProfile(sq *StatQueueProfile) (err error) {
|
||||
var result []byte
|
||||
result, err = rs.ms.Marshal(sq)
|
||||
if err != nil {
|
||||
return
|
||||
}
|
||||
return rs.Cmd("SET", utils.StatsConfigPrefix+sq.ID, result).Err
|
||||
return rs.Cmd("SET", utils.StatQueueProfilePrefix+sq.ID, result).Err
|
||||
}
|
||||
|
||||
// RemStatsQueue removes a StatsQueue from dataDB
|
||||
func (rs *RedisStorage) RemStatsConfig(sqID string) (err error) {
|
||||
key := utils.StatsConfigPrefix + sqID
|
||||
func (rs *RedisStorage) RemStatQueueProfile(sqID string) (err error) {
|
||||
key := utils.StatQueueProfilePrefix + sqID
|
||||
err = rs.Cmd("DEL", key).Err
|
||||
return
|
||||
}
|
||||
|
||||
@@ -1945,14 +1945,14 @@ func (tpr *TpReader) WriteToDatabase(flush, verbose, disable_reverse bool) (err
|
||||
}
|
||||
}
|
||||
if verbose {
|
||||
log.Print("Stats:")
|
||||
log.Print("StatQueueProfile:")
|
||||
}
|
||||
for _, tpST := range tpr.stats {
|
||||
st, err := APItoStats(tpST, tpr.timezone)
|
||||
if err != nil {
|
||||
return err
|
||||
}
|
||||
if err = tpr.dataStorage.SetStatsConfig(st); err != nil {
|
||||
if err = tpr.dataStorage.SetStatQueueProfile(st); err != nil {
|
||||
return err
|
||||
}
|
||||
if verbose {
|
||||
|
||||
@@ -32,13 +32,13 @@ type StatQueues []*StatQueue
|
||||
|
||||
// Sort is part of sort interface, sort based on Weight
|
||||
func (sis StatQueues) Sort() {
|
||||
sort.Slice(sis, func(i, j int) bool { return sis[i].cfg.Weight > sis[j].cfg.Weight })
|
||||
sort.Slice(sis, func(i, j int) bool { return sis[i].sqp.Weight > sis[j].sqp.Weight })
|
||||
}
|
||||
|
||||
// remWithID removes the queue with ID from slice
|
||||
func (sis StatQueues) remWithID(qID string) {
|
||||
for i, q := range sis {
|
||||
if q.cfg.ID == qID {
|
||||
if q.sqp.ID == qID {
|
||||
copy(sis[i:], sis[i+1:])
|
||||
sis[len(sis)-1] = nil
|
||||
sis = sis[:len(sis)-1]
|
||||
@@ -49,8 +49,8 @@ func (sis StatQueues) remWithID(qID string) {
|
||||
|
||||
// NewStatQueue instantiates a StatQueue
|
||||
func NewStatQueue(sec *StatsEventCache, ms engine.Marshaler,
|
||||
sqCfg *engine.StatsConfig, sqSM *engine.SQStoredMetrics) (si *StatQueue, err error) {
|
||||
si = &StatQueue{sec: sec, ms: ms, cfg: sqCfg, sqMetrics: make(map[string]StatsMetric)}
|
||||
sqCfg *engine.StatQueueProfile, sqSM *engine.SQStoredMetrics) (si *StatQueue, err error) {
|
||||
si = &StatQueue{sec: sec, ms: ms, sqp: sqCfg, sqMetrics: make(map[string]StatsMetric)}
|
||||
for _, metricID := range sqCfg.Metrics {
|
||||
if si.sqMetrics[metricID], err = NewStatsMetric(metricID); err != nil {
|
||||
return
|
||||
@@ -58,7 +58,7 @@ func NewStatQueue(sec *StatsEventCache, ms engine.Marshaler,
|
||||
}
|
||||
if sqSM != nil {
|
||||
for evID, ev := range sqSM.SEvents {
|
||||
si.sec.Cache(evID, ev, si.cfg.ID)
|
||||
si.sec.Cache(evID, ev, si.sqp.ID)
|
||||
}
|
||||
si.sqItems = sqSM.SQItems
|
||||
for metricID := range si.sqMetrics {
|
||||
@@ -85,7 +85,7 @@ type StatQueue struct {
|
||||
sqItems []*engine.SQItem
|
||||
sqMetrics map[string]StatsMetric
|
||||
ms engine.Marshaler // used to get/set Metrics
|
||||
cfg *engine.StatsConfig
|
||||
sqp *engine.StatQueueProfile
|
||||
}
|
||||
|
||||
// GetSQStoredMetrics retrieves the data used for store to DB
|
||||
@@ -152,10 +152,10 @@ func (sq *StatQueue) remExpired() {
|
||||
|
||||
// remOnQueueLength rems elements based on QueueLength setting
|
||||
func (sq *StatQueue) remOnQueueLength() {
|
||||
if sq.cfg.QueueLength == 0 {
|
||||
if sq.sqp.QueueLength == 0 {
|
||||
return
|
||||
}
|
||||
if len(sq.sqItems) == sq.cfg.QueueLength { // reached limit, rem first element
|
||||
if len(sq.sqItems) == sq.sqp.QueueLength { // reached limit, rem first element
|
||||
itm := sq.sqItems[0]
|
||||
sq.remEventWithID(itm.EventID)
|
||||
itm = nil
|
||||
|
||||
@@ -26,17 +26,17 @@ import (
|
||||
|
||||
func TestStatQueuesSort(t *testing.T) {
|
||||
sInsts := StatQueues{
|
||||
&StatQueue{cfg: &engine.StatsConfig{ID: "FIRST", Weight: 30.0}},
|
||||
&StatQueue{cfg: &engine.StatsConfig{ID: "SECOND", Weight: 40.0}},
|
||||
&StatQueue{cfg: &engine.StatsConfig{ID: "THIRD", Weight: 30.0}},
|
||||
&StatQueue{cfg: &engine.StatsConfig{ID: "FOURTH", Weight: 35.0}},
|
||||
&StatQueue{sqp: &engine.StatQueueProfile{ID: "FIRST", Weight: 30.0}},
|
||||
&StatQueue{sqp: &engine.StatQueueProfile{ID: "SECOND", Weight: 40.0}},
|
||||
&StatQueue{sqp: &engine.StatQueueProfile{ID: "THIRD", Weight: 30.0}},
|
||||
&StatQueue{sqp: &engine.StatQueueProfile{ID: "FOURTH", Weight: 35.0}},
|
||||
}
|
||||
sInsts.Sort()
|
||||
eSInst := StatQueues{
|
||||
&StatQueue{cfg: &engine.StatsConfig{ID: "SECOND", Weight: 40.0}},
|
||||
&StatQueue{cfg: &engine.StatsConfig{ID: "FOURTH", Weight: 35.0}},
|
||||
&StatQueue{cfg: &engine.StatsConfig{ID: "FIRST", Weight: 30.0}},
|
||||
&StatQueue{cfg: &engine.StatsConfig{ID: "THIRD", Weight: 30.0}},
|
||||
&StatQueue{sqp: &engine.StatQueueProfile{ID: "SECOND", Weight: 40.0}},
|
||||
&StatQueue{sqp: &engine.StatQueueProfile{ID: "FOURTH", Weight: 35.0}},
|
||||
&StatQueue{sqp: &engine.StatQueueProfile{ID: "FIRST", Weight: 30.0}},
|
||||
&StatQueue{sqp: &engine.StatQueueProfile{ID: "THIRD", Weight: 30.0}},
|
||||
}
|
||||
if !reflect.DeepEqual(eSInst, sInsts) {
|
||||
t.Errorf("expecting: %+v, received: %+v", eSInst, sInsts)
|
||||
|
||||
@@ -39,16 +39,16 @@ func init() {
|
||||
func NewStatService(dataDB engine.DataDB, ms engine.Marshaler, storeInterval time.Duration) (ss *StatService, err error) {
|
||||
ss = &StatService{dataDB: dataDB, ms: ms, storeInterval: storeInterval,
|
||||
stopStoring: make(chan struct{}), evCache: NewStatsEventCache()}
|
||||
sqPrfxs, err := dataDB.GetKeysForPrefix(utils.StatsConfigPrefix)
|
||||
sqPrfxs, err := dataDB.GetKeysForPrefix(utils.StatQueueProfilePrefix)
|
||||
if err != nil {
|
||||
return nil, err
|
||||
}
|
||||
ss.queuesCache = make(map[string]*StatQueue)
|
||||
ss.queues = make(StatQueues, 0)
|
||||
for _, prfx := range sqPrfxs {
|
||||
if q, err := ss.loadQueue(prfx[len(utils.StatsConfigPrefix):]); err != nil {
|
||||
if q, err := ss.loadQueue(prfx[len(utils.StatQueueProfilePrefix):]); err != nil {
|
||||
utils.Logger.Err(fmt.Sprintf("<StatS> failed loading quueue with id: <%s>, err: <%s>",
|
||||
q.cfg.ID, err.Error()))
|
||||
q.sqp.ID, err.Error()))
|
||||
continue
|
||||
} else {
|
||||
ss.setQueue(q)
|
||||
@@ -92,7 +92,7 @@ func (ss *StatService) Shutdown() error {
|
||||
// setQueue adds or modifies a queue into cache
|
||||
// sort will reorder the ss.queues
|
||||
func (ss *StatService) loadQueue(qID string) (q *StatQueue, err error) {
|
||||
sq, err := ss.dataDB.GetStatsConfig(qID)
|
||||
sq, err := ss.dataDB.GetStatQueueProfile(qID)
|
||||
if err != nil {
|
||||
return nil, err
|
||||
}
|
||||
@@ -106,7 +106,7 @@ func (ss *StatService) loadQueue(qID string) (q *StatQueue, err error) {
|
||||
}
|
||||
|
||||
func (ss *StatService) setQueue(q *StatQueue) {
|
||||
ss.queuesCache[q.cfg.ID] = q
|
||||
ss.queuesCache[q.sqp.ID] = q
|
||||
ss.queues = append(ss.queues, q)
|
||||
}
|
||||
|
||||
@@ -121,14 +121,14 @@ func (ss *StatService) remQueue(qID string) (si *StatQueue) {
|
||||
// store stores the necessary storedMetrics to dataDB
|
||||
func (ss *StatService) storeMetrics() {
|
||||
for _, si := range ss.queues {
|
||||
if !si.cfg.Store || !si.dirty { // no need to save
|
||||
if !si.sqp.Store || !si.dirty { // no need to save
|
||||
continue
|
||||
}
|
||||
if siSM := si.GetStoredMetrics(); siSM != nil {
|
||||
if err := ss.dataDB.SetSQStoredMetrics(siSM); err != nil {
|
||||
utils.Logger.Warning(
|
||||
fmt.Sprintf("<StatService> failed saving StoredMetrics for QueueID: %s, error: %s",
|
||||
si.cfg.ID, err.Error()))
|
||||
si.sqp.ID, err.Error()))
|
||||
}
|
||||
}
|
||||
// randomize the CPU load and give up thread control
|
||||
@@ -159,9 +159,9 @@ func (ss *StatService) processEvent(ev engine.StatsEvent) (err error) {
|
||||
if err := stInst.ProcessEvent(ev); err != nil {
|
||||
utils.Logger.Warning(
|
||||
fmt.Sprintf("<StatService> QueueID: %s, ignoring event with ID: %s, error: %s",
|
||||
stInst.cfg.ID, evStatsID, err.Error()))
|
||||
stInst.sqp.ID, evStatsID, err.Error()))
|
||||
}
|
||||
if stInst.cfg.Blocker {
|
||||
if stInst.sqp.Blocker {
|
||||
break
|
||||
}
|
||||
}
|
||||
@@ -225,13 +225,13 @@ type ArgsLoadQueues struct {
|
||||
func (ss *StatService) V1LoadQueues(args ArgsLoadQueues, reply *string) (err error) {
|
||||
qIDs := args.QueueIDs
|
||||
if qIDs == nil {
|
||||
sqPrfxs, err := ss.dataDB.GetKeysForPrefix(utils.StatsConfigPrefix)
|
||||
sqPrfxs, err := ss.dataDB.GetKeysForPrefix(utils.StatQueueProfilePrefix)
|
||||
if err != nil {
|
||||
return err
|
||||
}
|
||||
queueIDs := make([]string, len(sqPrfxs))
|
||||
for i, prfx := range sqPrfxs {
|
||||
queueIDs[i] = prfx[len(utils.StatsConfigPrefix):]
|
||||
queueIDs[i] = prfx[len(utils.StatQueueProfilePrefix):]
|
||||
}
|
||||
if len(queueIDs) != 0 {
|
||||
qIDs = &queueIDs
|
||||
@@ -247,7 +247,7 @@ func (ss *StatService) V1LoadQueues(args ArgsLoadQueues, reply *string) (err err
|
||||
}
|
||||
if q, err := ss.loadQueue(qID); err != nil {
|
||||
utils.Logger.Err(fmt.Sprintf("<StatS> failed loading quueue with id: <%s>, err: <%s>",
|
||||
q.cfg.ID, err.Error()))
|
||||
q.sqp.ID, err.Error()))
|
||||
continue
|
||||
} else {
|
||||
sQs = append(sQs, q)
|
||||
|
||||
@@ -32,8 +32,8 @@ func TestReqFilterPassStatS(t *testing.T) {
|
||||
config.SetCgrConfig(cgrCfg)
|
||||
}
|
||||
dataStorage, _ := engine.NewMapStorage()
|
||||
dataStorage.SetStatsConfig(
|
||||
&engine.StatsConfig{ID: "CDRST1",
|
||||
dataStorage.SetStatQueueProfile(
|
||||
&engine.StatQueueProfile{ID: "CDRST1",
|
||||
Filters: []*engine.RequestFilter{
|
||||
&engine.RequestFilter{Type: engine.MetaString, FieldName: "Tenant",
|
||||
Values: []string{"cgrates.org"}}},
|
||||
|
||||
@@ -257,7 +257,7 @@ const (
|
||||
LOG_CDR = "cdr_"
|
||||
LOG_MEDIATED_CDR = "mcd_"
|
||||
SQStoredMetricsPrefix = "ssm_"
|
||||
StatsConfigPrefix = "scf_"
|
||||
StatQueueProfilePrefix = "sqp_"
|
||||
ThresholdCfgPrefix = "thc_"
|
||||
LOADINST_KEY = "load_history"
|
||||
SESSION_MANAGER_SOURCE = "SMR"
|
||||
|
||||
Reference in New Issue
Block a user