diff --git a/apier/v1/stats.go b/apier/v1/stats.go index 5af1ae918..c3b90c148 100644 --- a/apier/v1/stats.go +++ b/apier/v1/stats.go @@ -93,11 +93,11 @@ type AttrGetStatsCfg struct { } //GetStatConfig returns a stat configuration -func (apierV1 *ApierV1) GetStatConfig(attr AttrGetStatsCfg, reply *engine.StatsConfig) error { +func (apierV1 *ApierV1) GetStatQueueProfile(attr AttrGetStatsCfg, reply *engine.StatQueueProfile) error { if missing := utils.MissingStructFields(&attr, []string{"ID"}); len(missing) != 0 { //Params missing return utils.NewErrMandatoryIeMissing(missing...) } - if sCfg, err := apierV1.DataDB.GetStatsConfig(attr.ID); err != nil { + if sCfg, err := apierV1.DataDB.GetStatQueueProfile(attr.ID); err != nil { if err.Error() != utils.ErrNotFound.Error() { err = utils.NewErrServerError(err) } @@ -109,11 +109,11 @@ func (apierV1 *ApierV1) GetStatConfig(attr AttrGetStatsCfg, reply *engine.StatsC } //SetStatConfig add a new stat configuration -func (apierV1 *ApierV1) SetStatConfig(attr *engine.StatsConfig, reply *string) error { +func (apierV1 *ApierV1) SetStatQueueProfile(attr *engine.StatQueueProfile, reply *string) error { if missing := utils.MissingStructFields(attr, []string{"ID"}); len(missing) != 0 { return utils.NewErrMandatoryIeMissing(missing...) } - if err := apierV1.DataDB.SetStatsConfig(attr); err != nil { + if err := apierV1.DataDB.SetStatQueueProfile(attr); err != nil { return utils.APIErrorHandler(err) } *reply = utils.OK @@ -122,11 +122,11 @@ func (apierV1 *ApierV1) SetStatConfig(attr *engine.StatsConfig, reply *string) e } //Remove a specific stat configuration -func (apierV1 *ApierV1) RemStatConfig(attrs AttrGetStatsCfg, reply *string) error { +func (apierV1 *ApierV1) RemStatQueueProfile(attrs AttrGetStatsCfg, reply *string) error { if missing := utils.MissingStructFields(&attrs, []string{"ID"}); len(missing) != 0 { //Params missing return utils.NewErrMandatoryIeMissing(missing...) } - if err := apierV1.DataDB.RemStatsConfig(attrs.ID); err != nil { + if err := apierV1.DataDB.RemStatQueueProfile(attrs.ID); err != nil { if err.Error() != utils.ErrNotFound.Error() { err = utils.NewErrServerError(err) } diff --git a/apier/v1/stats_it_test.go b/apier/v1/stats_it_test.go index f230ed026..92bb59c34 100644 --- a/apier/v1/stats_it_test.go +++ b/apier/v1/stats_it_test.go @@ -37,7 +37,7 @@ var ( stsV1CfgPath string stsV1Cfg *config.CGRConfig stsV1Rpc *rpc.Client - statConfig *engine.StatsConfig + statConfig *engine.StatQueueProfile stsV1ConfDIR string //run tests for specific configuration statsDelay int ) @@ -66,13 +66,13 @@ var sTestsStatSV1 = []func(t *testing.T){ testV1STSFromFolder, testV1STSGetStats, testV1STSProcessEvent, - testV1STSGetStatConfigBeforeSet, - testV1STSSetStatConfig, - testV1STSGetStatAfterSet, - testV1STSUpdateStatConfig, - testV1STSGetStatAfterUpdate, - testV1STSRemoveStatConfig, - testV1STSGetStatConfigAfterRemove, + testV1STSGetStatQueueProfileBeforeSet, + testV1STSSetStatQueueProfile, + testV1STSGetStatQueueProfileAfterSet, + testV1STSUpdateStatQueueProfile, + testV1STSGetStatQueueProfileAfterUpdate, + testV1STSRemoveStatQueueProfile, + testV1STSGetStatQueueProfileAfterRemove, testV1STSStopEngine, } @@ -209,15 +209,15 @@ func testV1STSProcessEvent(t *testing.T) { } } -func testV1STSGetStatConfigBeforeSet(t *testing.T) { - var reply *engine.StatsConfig - if err := stsV1Rpc.Call("ApierV1.GetStatConfig", &AttrGetStatsCfg{ID: "SCFG1"}, &reply); err == nil || err.Error() != utils.ErrNotFound.Error() { +func testV1STSGetStatQueueProfileBeforeSet(t *testing.T) { + var reply *engine.StatQueueProfile + if err := stsV1Rpc.Call("ApierV1.GetStatQueueProfile", &AttrGetStatsCfg{ID: "SCFG1"}, &reply); err == nil || err.Error() != utils.ErrNotFound.Error() { t.Error(err) } } -func testV1STSSetStatConfig(t *testing.T) { - statConfig = &engine.StatsConfig{ +func testV1STSSetStatQueueProfile(t *testing.T) { + statConfig = &engine.StatQueueProfile{ ID: "SCFG1", Filters: []*engine.RequestFilter{ &engine.RequestFilter{ @@ -240,23 +240,23 @@ func testV1STSSetStatConfig(t *testing.T) { Weight: 20, } var result string - if err := stsV1Rpc.Call("ApierV1.SetStatConfig", statConfig, &result); err != nil { + if err := stsV1Rpc.Call("ApierV1.SetStatQueueProfile", statConfig, &result); err != nil { t.Error(err) } else if result != utils.OK { t.Error("Unexpected reply returned", result) } } -func testV1STSGetStatAfterSet(t *testing.T) { - var reply *engine.StatsConfig - if err := stsV1Rpc.Call("ApierV1.GetStatConfig", &AttrGetStatsCfg{ID: "SCFG1"}, &reply); err != nil { +func testV1STSGetStatQueueProfileAfterSet(t *testing.T) { + var reply *engine.StatQueueProfile + if err := stsV1Rpc.Call("ApierV1.GetStatQueueProfile", &AttrGetStatsCfg{ID: "SCFG1"}, &reply); err != nil { t.Error(err) } else if !reflect.DeepEqual(statConfig, reply) { t.Errorf("Expecting: %+v, received: %+v", statConfig, reply) } } -func testV1STSUpdateStatConfig(t *testing.T) { +func testV1STSUpdateStatQueueProfile(t *testing.T) { var result string statConfig.Filters = []*engine.RequestFilter{ &engine.RequestFilter{ @@ -275,34 +275,34 @@ func testV1STSUpdateStatConfig(t *testing.T) { Values: []string{"10", "20"}, }, } - if err := stsV1Rpc.Call("ApierV1.SetStatConfig", statConfig, &result); err != nil { + if err := stsV1Rpc.Call("ApierV1.SetStatQueueProfile", statConfig, &result); err != nil { t.Error(err) } else if result != utils.OK { t.Error("Unexpected reply returned", result) } } -func testV1STSGetStatAfterUpdate(t *testing.T) { - var reply *engine.StatsConfig - if err := stsV1Rpc.Call("ApierV1.GetStatConfig", &AttrGetStatsCfg{ID: "SCFG1"}, &reply); err != nil { +func testV1STSGetStatQueueProfileAfterUpdate(t *testing.T) { + var reply *engine.StatQueueProfile + if err := stsV1Rpc.Call("ApierV1.GetStatQueueProfile", &AttrGetStatsCfg{ID: "SCFG1"}, &reply); err != nil { t.Error(err) } else if !reflect.DeepEqual(statConfig, reply) { t.Errorf("Expecting: %+v, received: %+v", statConfig, reply) } } -func testV1STSRemoveStatConfig(t *testing.T) { +func testV1STSRemoveStatQueueProfile(t *testing.T) { var resp string - if err := stsV1Rpc.Call("ApierV1.RemStatConfig", &AttrGetStatsCfg{ID: "SCFG1"}, &resp); err != nil { + if err := stsV1Rpc.Call("ApierV1.RemStatQueueProfile", &AttrGetStatsCfg{ID: "SCFG1"}, &resp); err != nil { t.Error(err) } else if resp != utils.OK { t.Error("Unexpected reply returned", resp) } } -func testV1STSGetStatConfigAfterRemove(t *testing.T) { - var reply *engine.StatsConfig - if err := stsV1Rpc.Call("ApierV1.GetStatConfig", &AttrGetStatsCfg{ID: "SCFG1"}, &reply); err == nil || err.Error() != utils.ErrNotFound.Error() { +func testV1STSGetStatQueueProfileAfterRemove(t *testing.T) { + var reply *engine.StatQueueProfile + if err := stsV1Rpc.Call("ApierV1.GetStatQueueProfile", &AttrGetStatsCfg{ID: "SCFG1"}, &reply); err == nil || err.Error() != utils.ErrNotFound.Error() { t.Error(err) } } diff --git a/apier/v1/tpstats_it_test.go b/apier/v1/tpstats_it_test.go index 8c39b9539..12fefc3a3 100644 --- a/apier/v1/tpstats_it_test.go +++ b/apier/v1/tpstats_it_test.go @@ -87,7 +87,7 @@ func testTPStatsInitCfg(t *testing.T) { tpStatCfg.DataFolderPath = tpStatDataDir // Share DataFolderPath through config towards StoreDb for Flush() config.SetCgrConfig(tpStatCfg) switch tpStatConfigDIR { - case "tutmongo": // Mongo needs more time to reset db, need to investigate + case "tutmongo": // Mongo needs more time to reset db tpStatDelay = 4000 default: tpStatDelay = 1000 diff --git a/engine/libstats.go b/engine/libstats.go index aad179ba9..539dbdf77 100755 --- a/engine/libstats.go +++ b/engine/libstats.go @@ -39,7 +39,7 @@ type SQStoredMetrics struct { } // StatsConfig represents the configuration of a StatsInstance in StatS -type StatsConfig struct { +type StatQueueProfile struct { ID string // QueueID Filters []*RequestFilter ActivationInterval *utils.ActivationInterval // Activation interval diff --git a/engine/loader_it_test.go b/engine/loader_it_test.go index a5a5cb61d..168541ef5 100755 --- a/engine/loader_it_test.go +++ b/engine/loader_it_test.go @@ -324,7 +324,7 @@ func TestLoaderITWriteToDatabase(t *testing.T) { } for k, st := range loader.stats { - rcv, err := loader.dataStorage.GetStatsConfig(k) + rcv, err := loader.dataStorage.GetStatQueueProfile(k) if err != nil { t.Error("Failed GetStatsQueue: ", err.Error()) } diff --git a/engine/model_helpers.go b/engine/model_helpers.go index c26aca1e6..40e2f3c97 100755 --- a/engine/model_helpers.go +++ b/engine/model_helpers.go @@ -2069,8 +2069,8 @@ func APItoModelStats(st *utils.TPStats) (mdls TpStatsS) { return } -func APItoStats(tpST *utils.TPStats, timezone string) (st *StatsConfig, err error) { - st = &StatsConfig{ +func APItoStats(tpST *utils.TPStats, timezone string) (st *StatQueueProfile, err error) { + st = &StatQueueProfile{ ID: tpST.ID, QueueLength: tpST.QueueLength, Weight: tpST.Weight, diff --git a/engine/model_helpers_test.go b/engine/model_helpers_test.go index 7533150e9..ee9fba139 100755 --- a/engine/model_helpers_test.go +++ b/engine/model_helpers_test.go @@ -901,7 +901,7 @@ func TestAPItoTPStats(t *testing.T) { Weight: 20.0, } - eTPs := &StatsConfig{ID: tps.ID, + eTPs := &StatQueueProfile{ID: tps.ID, QueueLength: tps.QueueLength, Metrics: []string{"*asr", "*acd", "*acc"}, Thresholds: []string{"THRESH1", "THRESH2"}, diff --git a/engine/onstor_it_test.go b/engine/onstor_it_test.go index bae70a391..1faa13d19 100644 --- a/engine/onstor_it_test.go +++ b/engine/onstor_it_test.go @@ -90,7 +90,7 @@ var sTestsOnStorIT = []func(t *testing.T){ testOnStorITCRUDHistory, testOnStorITCRUDStructVersion, testOnStorITCRUDSQStoredMetrics, - testOnStorITCRUDStats, + testOnStorITCRUDStatQueueProfile, testOnStorITCRUDThresholdCfg, } @@ -1973,9 +1973,9 @@ func testOnStorITCRUDSQStoredMetrics(t *testing.T) { } } -func testOnStorITCRUDStats(t *testing.T) { +func testOnStorITCRUDStatQueueProfile(t *testing.T) { timeTTL := time.Duration(0 * time.Second) - sq := &StatsConfig{ + sq := &StatQueueProfile{ ID: "test", ActivationInterval: &utils.ActivationInterval{}, Filters: []*RequestFilter{}, @@ -1985,33 +1985,33 @@ func testOnStorITCRUDStats(t *testing.T) { Store: true, Thresholds: []string{}, } - if _, rcvErr := onStor.GetStatsConfig(sq.ID); rcvErr != utils.ErrNotFound { + if _, rcvErr := onStor.GetStatQueueProfile(sq.ID); rcvErr != utils.ErrNotFound { t.Error(rcvErr) } - if _, ok := cache.Get(utils.StatsConfigPrefix + sq.ID); ok != false { + if _, ok := cache.Get(utils.StatQueueProfilePrefix + sq.ID); ok != false { t.Error("Should not be in cache") } - if err := onStor.SetStatsConfig(sq); err != nil { + if err := onStor.SetStatQueueProfile(sq); err != nil { t.Error(err) } - if _, ok := cache.Get(utils.StatsConfigPrefix + sq.ID); ok != false { + if _, ok := cache.Get(utils.StatQueueProfilePrefix + sq.ID); ok != false { t.Error("Should not be in cache") } - if rcv, err := onStor.GetStatsConfig(sq.ID); err != nil { + if rcv, err := onStor.GetStatQueueProfile(sq.ID); err != nil { t.Error(err) } else if !reflect.DeepEqual(sq, rcv) { t.Errorf("Expecting: %v, received: %v", sq, rcv) } - if _, ok := cache.Get(utils.StatsConfigPrefix + sq.ID); ok != false { + if _, ok := cache.Get(utils.StatQueueProfilePrefix + sq.ID); ok != false { t.Error("Should not be in cache") } - if err := onStor.RemStatsConfig(sq.ID); err != nil { + if err := onStor.RemStatQueueProfile(sq.ID); err != nil { t.Error(err) } - if _, ok := cache.Get(utils.StatsConfigPrefix + sq.ID); ok != false { + if _, ok := cache.Get(utils.StatQueueProfilePrefix + sq.ID); ok != false { t.Error("Should not be in cache") } - if _, rcvErr := onStor.GetStatsConfig(sq.ID); rcvErr != utils.ErrNotFound { + if _, rcvErr := onStor.GetStatQueueProfile(sq.ID); rcvErr != utils.ErrNotFound { t.Error(rcvErr) } } diff --git a/engine/storage_interface.go b/engine/storage_interface.go index 39ce86773..115d153ba 100755 --- a/engine/storage_interface.go +++ b/engine/storage_interface.go @@ -112,9 +112,9 @@ type DataDB interface { GetReqFilterIndexes(dbKey string) (indexes map[string]map[string]utils.StringMap, err error) SetReqFilterIndexes(dbKey string, indexes map[string]map[string]utils.StringMap) (err error) MatchReqFilterIndex(dbKey, fieldName, fieldVal string) (itemIDs utils.StringMap, err error) - GetStatsConfig(sqID string) (sq *StatsConfig, err error) - SetStatsConfig(sq *StatsConfig) (err error) - RemStatsConfig(sqID string) (err error) + GetStatQueueProfile(sqID string) (sq *StatQueueProfile, err error) + SetStatQueueProfile(sq *StatQueueProfile) (err error) + RemStatQueueProfile(sqID string) (err error) GetSQStoredMetrics(sqID string) (sqSM *SQStoredMetrics, err error) SetSQStoredMetrics(sqSM *SQStoredMetrics) (err error) RemSQStoredMetrics(sqID string) (err error) diff --git a/engine/storage_map.go b/engine/storage_map.go index d1cc2240b..6fc777de5 100755 --- a/engine/storage_map.go +++ b/engine/storage_map.go @@ -1472,10 +1472,10 @@ func (ms *MapStorage) MatchReqFilterIndex(dbKey, fldName, fldVal string) (itemID } // GetStatsQueue retrieves a StatsQueue from dataDB -func (ms *MapStorage) GetStatsConfig(sqID string) (scf *StatsConfig, err error) { +func (ms *MapStorage) GetStatQueueProfile(sqID string) (scf *StatQueueProfile, err error) { ms.mu.RLock() defer ms.mu.RUnlock() - key := utils.StatsConfigPrefix + sqID + key := utils.StatQueueProfilePrefix + sqID values, ok := ms.dict[key] if !ok { return nil, utils.ErrNotFound @@ -1493,22 +1493,22 @@ func (ms *MapStorage) GetStatsConfig(sqID string) (scf *StatsConfig, err error) } // SetStatsQueue stores a StatsQueue into DataDB -func (ms *MapStorage) SetStatsConfig(scf *StatsConfig) (err error) { +func (ms *MapStorage) SetStatQueueProfile(scf *StatQueueProfile) (err error) { ms.mu.Lock() defer ms.mu.Unlock() result, err := ms.ms.Marshal(scf) if err != nil { return err } - ms.dict[utils.StatsConfigPrefix+scf.ID] = result + ms.dict[utils.StatQueueProfilePrefix+scf.ID] = result return } // RemStatsQueue removes a StatsQueue from dataDB -func (ms *MapStorage) RemStatsConfig(scfID string) (err error) { +func (ms *MapStorage) RemStatQueueProfile(scfID string) (err error) { ms.mu.Lock() defer ms.mu.Unlock() - key := utils.StatsConfigPrefix + scfID + key := utils.StatQueueProfilePrefix + scfID delete(ms.dict, key) return } diff --git a/engine/storage_mongo_datadb.go b/engine/storage_mongo_datadb.go index 75c0f9aab..898e151d1 100755 --- a/engine/storage_mongo_datadb.go +++ b/engine/storage_mongo_datadb.go @@ -57,7 +57,7 @@ const ( colLht = "load_history" colVer = "versions" colRsP = "resource_profiles" - colSts = "stats" + colSqp = "stat_queue_profiles" colRFI = "request_filter_indexes" colTmg = "timings" colRes = "resources" @@ -325,7 +325,7 @@ func (ms *MongoStorage) getColNameForPrefix(prefix string) (name string, ok bool utils.LOADINST_KEY: colLht, utils.VERSION_PREFIX: colVer, utils.ResourceProfilesPrefix: colRsP, - utils.StatsPrefix: colSts, + utils.StatsPrefix: colStq, utils.TimingsPrefix: colTmg, utils.ResourcesPrefix: colRes, } @@ -631,14 +631,14 @@ func (ms *MongoStorage) GetKeysForPrefix(prefix string) (result []string, err er result = append(result, utils.ResourcesPrefix+idResult.Id) } case utils.StatsPrefix: - iter := db.C(colSts).Find(bson.M{"id": bson.M{"$regex": bson.RegEx{Pattern: subject}}}).Select(bson.M{"id": 1}).Iter() + iter := db.C(colStq).Find(bson.M{"id": bson.M{"$regex": bson.RegEx{Pattern: subject}}}).Select(bson.M{"id": 1}).Iter() for iter.Next(&idResult) { result = append(result, utils.StatsPrefix+idResult.Id) } - case utils.StatsConfigPrefix: - iter := db.C(colStq).Find(bson.M{"id": bson.M{"$regex": bson.RegEx{Pattern: subject}}}).Select(bson.M{"id": 1}).Iter() + case utils.StatQueueProfilePrefix: + iter := db.C(colSqp).Find(bson.M{"id": bson.M{"$regex": bson.RegEx{Pattern: subject}}}).Select(bson.M{"id": 1}).Iter() for iter.Next(&idResult) { - result = append(result, utils.StatsConfigPrefix+idResult.Id) + result = append(result, utils.StatQueueProfilePrefix+idResult.Id) } case utils.AccountActionPlansPrefix: iter := db.C(colAAp).Find(bson.M{"key": bson.M{"$regex": bson.RegEx{Pattern: subject}}}).Select(bson.M{"id": 1}).Iter() @@ -2008,10 +2008,10 @@ func (ms *MongoStorage) MatchReqFilterIndex(dbKey, fldName, fldVal string) (item } // GetStatsQueue retrieves a StatsQueue from dataDB -func (ms *MongoStorage) GetStatsConfig(sqID string) (sq *StatsConfig, err error) { - session, col := ms.conn(utils.StatsConfigPrefix) +func (ms *MongoStorage) GetStatQueueProfile(sqID string) (sq *StatQueueProfile, err error) { + session, col := ms.conn(utils.StatQueueProfilePrefix) defer session.Close() - sq = new(StatsConfig) + sq = new(StatQueueProfile) if err = col.Find(bson.M{"id": sqID}).One(&sq); err != nil { if err == mgo.ErrNotFound { err = utils.ErrNotFound @@ -2027,16 +2027,16 @@ func (ms *MongoStorage) GetStatsConfig(sqID string) (sq *StatsConfig, err error) } // SetStatsQueue stores a StatsQueue into DataDB -func (ms *MongoStorage) SetStatsConfig(sq *StatsConfig) (err error) { - session, col := ms.conn(utils.StatsConfigPrefix) +func (ms *MongoStorage) SetStatQueueProfile(sq *StatQueueProfile) (err error) { + session, col := ms.conn(utils.StatQueueProfilePrefix) defer session.Close() _, err = col.UpsertId(bson.M{"id": sq.ID}, sq) return } // RemStatsQueue removes a StatsQueue from dataDB -func (ms *MongoStorage) RemStatsConfig(sqID string) (err error) { - session, col := ms.conn(utils.StatsConfigPrefix) +func (ms *MongoStorage) RemStatQueueProfile(sqID string) (err error) { + session, col := ms.conn(utils.StatQueueProfilePrefix) err = col.Remove(bson.M{"id": sqID}) if err != nil { return err diff --git a/engine/storage_redis.go b/engine/storage_redis.go index 73cdcbb04..6561f453a 100755 --- a/engine/storage_redis.go +++ b/engine/storage_redis.go @@ -1583,8 +1583,8 @@ func (rs *RedisStorage) RemoveVersions(vrs Versions) (err error) { } // GetStatsConfig retrieves a StatsConfig from dataDB -func (rs *RedisStorage) GetStatsConfig(sqID string) (sq *StatsConfig, err error) { - key := utils.StatsConfigPrefix + sqID +func (rs *RedisStorage) GetStatQueueProfile(sqID string) (sq *StatQueueProfile, err error) { + key := utils.StatQueueProfilePrefix + sqID var values []byte if values, err = rs.Cmd("GET", key).Bytes(); err != nil { if err == redis.ErrRespNil { @@ -1604,18 +1604,18 @@ func (rs *RedisStorage) GetStatsConfig(sqID string) (sq *StatsConfig, err error) } // SetStatsQueue stores a StatsQueue into DataDB -func (rs *RedisStorage) SetStatsConfig(sq *StatsConfig) (err error) { +func (rs *RedisStorage) SetStatQueueProfile(sq *StatQueueProfile) (err error) { var result []byte result, err = rs.ms.Marshal(sq) if err != nil { return } - return rs.Cmd("SET", utils.StatsConfigPrefix+sq.ID, result).Err + return rs.Cmd("SET", utils.StatQueueProfilePrefix+sq.ID, result).Err } // RemStatsQueue removes a StatsQueue from dataDB -func (rs *RedisStorage) RemStatsConfig(sqID string) (err error) { - key := utils.StatsConfigPrefix + sqID +func (rs *RedisStorage) RemStatQueueProfile(sqID string) (err error) { + key := utils.StatQueueProfilePrefix + sqID err = rs.Cmd("DEL", key).Err return } diff --git a/engine/tp_reader.go b/engine/tp_reader.go index fc734ebc3..6b08b1419 100755 --- a/engine/tp_reader.go +++ b/engine/tp_reader.go @@ -1945,14 +1945,14 @@ func (tpr *TpReader) WriteToDatabase(flush, verbose, disable_reverse bool) (err } } if verbose { - log.Print("Stats:") + log.Print("StatQueueProfile:") } for _, tpST := range tpr.stats { st, err := APItoStats(tpST, tpr.timezone) if err != nil { return err } - if err = tpr.dataStorage.SetStatsConfig(st); err != nil { + if err = tpr.dataStorage.SetStatQueueProfile(st); err != nil { return err } if verbose { diff --git a/stats/queue.go b/stats/queue.go index 18f9f3064..afadef9b9 100644 --- a/stats/queue.go +++ b/stats/queue.go @@ -32,13 +32,13 @@ type StatQueues []*StatQueue // Sort is part of sort interface, sort based on Weight func (sis StatQueues) Sort() { - sort.Slice(sis, func(i, j int) bool { return sis[i].cfg.Weight > sis[j].cfg.Weight }) + sort.Slice(sis, func(i, j int) bool { return sis[i].sqp.Weight > sis[j].sqp.Weight }) } // remWithID removes the queue with ID from slice func (sis StatQueues) remWithID(qID string) { for i, q := range sis { - if q.cfg.ID == qID { + if q.sqp.ID == qID { copy(sis[i:], sis[i+1:]) sis[len(sis)-1] = nil sis = sis[:len(sis)-1] @@ -49,8 +49,8 @@ func (sis StatQueues) remWithID(qID string) { // NewStatQueue instantiates a StatQueue func NewStatQueue(sec *StatsEventCache, ms engine.Marshaler, - sqCfg *engine.StatsConfig, sqSM *engine.SQStoredMetrics) (si *StatQueue, err error) { - si = &StatQueue{sec: sec, ms: ms, cfg: sqCfg, sqMetrics: make(map[string]StatsMetric)} + sqCfg *engine.StatQueueProfile, sqSM *engine.SQStoredMetrics) (si *StatQueue, err error) { + si = &StatQueue{sec: sec, ms: ms, sqp: sqCfg, sqMetrics: make(map[string]StatsMetric)} for _, metricID := range sqCfg.Metrics { if si.sqMetrics[metricID], err = NewStatsMetric(metricID); err != nil { return @@ -58,7 +58,7 @@ func NewStatQueue(sec *StatsEventCache, ms engine.Marshaler, } if sqSM != nil { for evID, ev := range sqSM.SEvents { - si.sec.Cache(evID, ev, si.cfg.ID) + si.sec.Cache(evID, ev, si.sqp.ID) } si.sqItems = sqSM.SQItems for metricID := range si.sqMetrics { @@ -85,7 +85,7 @@ type StatQueue struct { sqItems []*engine.SQItem sqMetrics map[string]StatsMetric ms engine.Marshaler // used to get/set Metrics - cfg *engine.StatsConfig + sqp *engine.StatQueueProfile } // GetSQStoredMetrics retrieves the data used for store to DB @@ -152,10 +152,10 @@ func (sq *StatQueue) remExpired() { // remOnQueueLength rems elements based on QueueLength setting func (sq *StatQueue) remOnQueueLength() { - if sq.cfg.QueueLength == 0 { + if sq.sqp.QueueLength == 0 { return } - if len(sq.sqItems) == sq.cfg.QueueLength { // reached limit, rem first element + if len(sq.sqItems) == sq.sqp.QueueLength { // reached limit, rem first element itm := sq.sqItems[0] sq.remEventWithID(itm.EventID) itm = nil diff --git a/stats/queue_test.go b/stats/queue_test.go index 560a1be2e..dd98308ac 100644 --- a/stats/queue_test.go +++ b/stats/queue_test.go @@ -26,17 +26,17 @@ import ( func TestStatQueuesSort(t *testing.T) { sInsts := StatQueues{ - &StatQueue{cfg: &engine.StatsConfig{ID: "FIRST", Weight: 30.0}}, - &StatQueue{cfg: &engine.StatsConfig{ID: "SECOND", Weight: 40.0}}, - &StatQueue{cfg: &engine.StatsConfig{ID: "THIRD", Weight: 30.0}}, - &StatQueue{cfg: &engine.StatsConfig{ID: "FOURTH", Weight: 35.0}}, + &StatQueue{sqp: &engine.StatQueueProfile{ID: "FIRST", Weight: 30.0}}, + &StatQueue{sqp: &engine.StatQueueProfile{ID: "SECOND", Weight: 40.0}}, + &StatQueue{sqp: &engine.StatQueueProfile{ID: "THIRD", Weight: 30.0}}, + &StatQueue{sqp: &engine.StatQueueProfile{ID: "FOURTH", Weight: 35.0}}, } sInsts.Sort() eSInst := StatQueues{ - &StatQueue{cfg: &engine.StatsConfig{ID: "SECOND", Weight: 40.0}}, - &StatQueue{cfg: &engine.StatsConfig{ID: "FOURTH", Weight: 35.0}}, - &StatQueue{cfg: &engine.StatsConfig{ID: "FIRST", Weight: 30.0}}, - &StatQueue{cfg: &engine.StatsConfig{ID: "THIRD", Weight: 30.0}}, + &StatQueue{sqp: &engine.StatQueueProfile{ID: "SECOND", Weight: 40.0}}, + &StatQueue{sqp: &engine.StatQueueProfile{ID: "FOURTH", Weight: 35.0}}, + &StatQueue{sqp: &engine.StatQueueProfile{ID: "FIRST", Weight: 30.0}}, + &StatQueue{sqp: &engine.StatQueueProfile{ID: "THIRD", Weight: 30.0}}, } if !reflect.DeepEqual(eSInst, sInsts) { t.Errorf("expecting: %+v, received: %+v", eSInst, sInsts) diff --git a/stats/service.go b/stats/service.go index 5e4417a0a..12660383e 100755 --- a/stats/service.go +++ b/stats/service.go @@ -39,16 +39,16 @@ func init() { func NewStatService(dataDB engine.DataDB, ms engine.Marshaler, storeInterval time.Duration) (ss *StatService, err error) { ss = &StatService{dataDB: dataDB, ms: ms, storeInterval: storeInterval, stopStoring: make(chan struct{}), evCache: NewStatsEventCache()} - sqPrfxs, err := dataDB.GetKeysForPrefix(utils.StatsConfigPrefix) + sqPrfxs, err := dataDB.GetKeysForPrefix(utils.StatQueueProfilePrefix) if err != nil { return nil, err } ss.queuesCache = make(map[string]*StatQueue) ss.queues = make(StatQueues, 0) for _, prfx := range sqPrfxs { - if q, err := ss.loadQueue(prfx[len(utils.StatsConfigPrefix):]); err != nil { + if q, err := ss.loadQueue(prfx[len(utils.StatQueueProfilePrefix):]); err != nil { utils.Logger.Err(fmt.Sprintf(" failed loading quueue with id: <%s>, err: <%s>", - q.cfg.ID, err.Error())) + q.sqp.ID, err.Error())) continue } else { ss.setQueue(q) @@ -92,7 +92,7 @@ func (ss *StatService) Shutdown() error { // setQueue adds or modifies a queue into cache // sort will reorder the ss.queues func (ss *StatService) loadQueue(qID string) (q *StatQueue, err error) { - sq, err := ss.dataDB.GetStatsConfig(qID) + sq, err := ss.dataDB.GetStatQueueProfile(qID) if err != nil { return nil, err } @@ -106,7 +106,7 @@ func (ss *StatService) loadQueue(qID string) (q *StatQueue, err error) { } func (ss *StatService) setQueue(q *StatQueue) { - ss.queuesCache[q.cfg.ID] = q + ss.queuesCache[q.sqp.ID] = q ss.queues = append(ss.queues, q) } @@ -121,14 +121,14 @@ func (ss *StatService) remQueue(qID string) (si *StatQueue) { // store stores the necessary storedMetrics to dataDB func (ss *StatService) storeMetrics() { for _, si := range ss.queues { - if !si.cfg.Store || !si.dirty { // no need to save + if !si.sqp.Store || !si.dirty { // no need to save continue } if siSM := si.GetStoredMetrics(); siSM != nil { if err := ss.dataDB.SetSQStoredMetrics(siSM); err != nil { utils.Logger.Warning( fmt.Sprintf(" failed saving StoredMetrics for QueueID: %s, error: %s", - si.cfg.ID, err.Error())) + si.sqp.ID, err.Error())) } } // randomize the CPU load and give up thread control @@ -159,9 +159,9 @@ func (ss *StatService) processEvent(ev engine.StatsEvent) (err error) { if err := stInst.ProcessEvent(ev); err != nil { utils.Logger.Warning( fmt.Sprintf(" QueueID: %s, ignoring event with ID: %s, error: %s", - stInst.cfg.ID, evStatsID, err.Error())) + stInst.sqp.ID, evStatsID, err.Error())) } - if stInst.cfg.Blocker { + if stInst.sqp.Blocker { break } } @@ -225,13 +225,13 @@ type ArgsLoadQueues struct { func (ss *StatService) V1LoadQueues(args ArgsLoadQueues, reply *string) (err error) { qIDs := args.QueueIDs if qIDs == nil { - sqPrfxs, err := ss.dataDB.GetKeysForPrefix(utils.StatsConfigPrefix) + sqPrfxs, err := ss.dataDB.GetKeysForPrefix(utils.StatQueueProfilePrefix) if err != nil { return err } queueIDs := make([]string, len(sqPrfxs)) for i, prfx := range sqPrfxs { - queueIDs[i] = prfx[len(utils.StatsConfigPrefix):] + queueIDs[i] = prfx[len(utils.StatQueueProfilePrefix):] } if len(queueIDs) != 0 { qIDs = &queueIDs @@ -247,7 +247,7 @@ func (ss *StatService) V1LoadQueues(args ArgsLoadQueues, reply *string) (err err } if q, err := ss.loadQueue(qID); err != nil { utils.Logger.Err(fmt.Sprintf(" failed loading quueue with id: <%s>, err: <%s>", - q.cfg.ID, err.Error())) + q.sqp.ID, err.Error())) continue } else { sQs = append(sQs, q) diff --git a/stats/service_test.go b/stats/service_test.go index b0b580017..2cbf9b90d 100644 --- a/stats/service_test.go +++ b/stats/service_test.go @@ -32,8 +32,8 @@ func TestReqFilterPassStatS(t *testing.T) { config.SetCgrConfig(cgrCfg) } dataStorage, _ := engine.NewMapStorage() - dataStorage.SetStatsConfig( - &engine.StatsConfig{ID: "CDRST1", + dataStorage.SetStatQueueProfile( + &engine.StatQueueProfile{ID: "CDRST1", Filters: []*engine.RequestFilter{ &engine.RequestFilter{Type: engine.MetaString, FieldName: "Tenant", Values: []string{"cgrates.org"}}}, diff --git a/utils/consts.go b/utils/consts.go index f223a9c7f..aba116bb0 100755 --- a/utils/consts.go +++ b/utils/consts.go @@ -257,7 +257,7 @@ const ( LOG_CDR = "cdr_" LOG_MEDIATED_CDR = "mcd_" SQStoredMetricsPrefix = "ssm_" - StatsConfigPrefix = "scf_" + StatQueueProfilePrefix = "sqp_" ThresholdCfgPrefix = "thc_" LOADINST_KEY = "load_history" SESSION_MANAGER_SOURCE = "SMR"