mirror of
https://github.com/cgrates/cgrates.git
synced 2026-02-13 19:56:38 +05:00
StatsV1.GetQueueIDs API, add integration tests for StatS
This commit is contained in:
@@ -100,6 +100,11 @@ func (stsv1 *StatSV1) Call(serviceMethod string, args interface{}, reply interfa
|
||||
return err
|
||||
}
|
||||
|
||||
// GetQueueIDs returns list of queueIDs registered for a tenant
|
||||
func (stsv1 *StatSV1) GetQueueIDs(tenant string, qIDs *[]string) error {
|
||||
return stsv1.sS.V1GetQueueIDs(tenant, qIDs)
|
||||
}
|
||||
|
||||
// ProcessEvent returns processes a new Event
|
||||
func (stsv1 *StatSV1) ProcessEvent(ev *engine.StatEvent, reply *string) error {
|
||||
return stsv1.sS.V1ProcessEvent(ev, reply)
|
||||
|
||||
@@ -19,7 +19,6 @@ along with this program. If not, see <http://www.gnu.org/licenses/>
|
||||
*/
|
||||
package v1
|
||||
|
||||
/*
|
||||
import (
|
||||
"math/rand"
|
||||
"net/rpc"
|
||||
@@ -43,16 +42,22 @@ var (
|
||||
statsDelay int
|
||||
)
|
||||
|
||||
var evs = []engine.StatsEvent{
|
||||
engine.StatsEvent{
|
||||
utils.ID: "event1",
|
||||
utils.ANSWER_TIME: time.Date(2014, 7, 14, 14, 25, 0, 0, time.UTC).Local()},
|
||||
engine.StatsEvent{
|
||||
utils.ID: "event2",
|
||||
utils.ANSWER_TIME: time.Date(2014, 7, 14, 14, 25, 0, 0, time.UTC).Local()},
|
||||
engine.StatsEvent{
|
||||
utils.ID: "event3",
|
||||
utils.SETUP_TIME: time.Date(2014, 7, 14, 14, 25, 0, 0, time.UTC).Local()},
|
||||
var evs = []*engine.StatEvent{
|
||||
&engine.StatEvent{
|
||||
Tenant: "cgrates.org",
|
||||
ID: "event1",
|
||||
Fields: map[string]interface{}{
|
||||
utils.ANSWER_TIME: time.Date(2014, 7, 14, 14, 25, 0, 0, time.UTC).Local()}},
|
||||
&engine.StatEvent{
|
||||
Tenant: "cgrates.org",
|
||||
ID: "event2",
|
||||
Fields: map[string]interface{}{
|
||||
utils.ANSWER_TIME: time.Date(2014, 7, 14, 14, 25, 0, 0, time.UTC).Local()}},
|
||||
&engine.StatEvent{
|
||||
Tenant: "cgrates.org",
|
||||
ID: "event3",
|
||||
Fields: map[string]interface{}{
|
||||
utils.SETUP_TIME: time.Date(2014, 7, 14, 14, 25, 0, 0, time.UTC).Local()}},
|
||||
}
|
||||
|
||||
func init() {
|
||||
@@ -66,15 +71,15 @@ var sTestsStatSV1 = []func(t *testing.T){
|
||||
testV1STSRpcConn,
|
||||
testV1STSFromFolder,
|
||||
testV1STSGetStats,
|
||||
testV1STSProcessEvent,
|
||||
testV1STSGetStatQueueProfileBeforeSet,
|
||||
testV1STSSetStatQueueProfile,
|
||||
testV1STSGetStatQueueProfileAfterSet,
|
||||
testV1STSUpdateStatQueueProfile,
|
||||
testV1STSGetStatQueueProfileAfterUpdate,
|
||||
testV1STSRemoveStatQueueProfile,
|
||||
testV1STSGetStatQueueProfileAfterRemove,
|
||||
testV1STSStopEngine,
|
||||
//testV1STSProcessEvent,
|
||||
//testV1STSGetStatQueueProfileBeforeSet,
|
||||
//testV1STSSetStatQueueProfile,
|
||||
//testV1STSGetStatQueueProfileAfterSet,
|
||||
//testV1STSUpdateStatQueueProfile,
|
||||
//testV1STSGetStatQueueProfileAfterUpdate,
|
||||
//testV1STSRemoveStatQueueProfile,
|
||||
//testV1STSGetStatQueueProfileAfterRemove,
|
||||
//testV1STSStopEngine,
|
||||
}
|
||||
|
||||
//Test start here
|
||||
@@ -128,47 +133,35 @@ func testV1STSRpcConn(t *testing.T) {
|
||||
|
||||
func testV1STSFromFolder(t *testing.T) {
|
||||
var reply string
|
||||
time.Sleep(time.Duration(2000) * time.Millisecond)
|
||||
attrs := &utils.AttrLoadTpFromFolder{FolderPath: path.Join(*dataDir, "tariffplans", "tutorial")}
|
||||
if err := stsV1Rpc.Call("ApierV1.LoadTariffPlanFromFolder", attrs, &reply); err != nil {
|
||||
t.Error(err)
|
||||
}
|
||||
time.Sleep(time.Duration(1000) * time.Millisecond)
|
||||
}
|
||||
|
||||
func testV1STSGetStats(t *testing.T) {
|
||||
var reply []string
|
||||
// first attempt should be empty since there is no queue in cache yet
|
||||
if err := stsV1Rpc.Call("StatSV1.GetQueueIDs", struct{}{}, &reply); err == nil || err.Error() != utils.ErrNotFound.Error() {
|
||||
t.Error(err)
|
||||
}
|
||||
var metrics map[string]string
|
||||
if err := stsV1Rpc.Call("StatSV1.GetStringMetrics", "Stats1", &metrics); err == nil || err.Error() != utils.ErrNotFound.Error() {
|
||||
t.Error(err)
|
||||
}
|
||||
var replyStr string
|
||||
if err := stsV1Rpc.Call("StatSV1.LoadQueues", nil, &replyStr); err != nil {
|
||||
t.Error(err)
|
||||
} else if replyStr != utils.OK {
|
||||
t.Errorf("reply received: %s", replyStr)
|
||||
}
|
||||
expectedIDs := []string{"Stats1"}
|
||||
if err := stsV1Rpc.Call("StatSV1.GetQueueIDs", struct{}{}, &reply); err != nil {
|
||||
expectedIDs := []string{"STATS_1"}
|
||||
if err := stsV1Rpc.Call("StatSV1.GetQueueIDs", "cgrates.org", &reply); err != nil {
|
||||
t.Error(err)
|
||||
} else if !reflect.DeepEqual(expectedIDs, reply) {
|
||||
t.Errorf("expecting: %+v, received reply: %s", expectedIDs, reply)
|
||||
}
|
||||
var metrics map[string]string
|
||||
expectedMetrics := map[string]string{
|
||||
utils.MetaASR: utils.NOT_AVAILABLE,
|
||||
utils.MetaACD: "",
|
||||
}
|
||||
if err := stsV1Rpc.Call("StatSV1.GetStringMetrics", "Stats1", &metrics); err != nil {
|
||||
if err := stsV1Rpc.Call("StatSV1.GetQueueStringMetrics",
|
||||
&utils.TenantID{"cgrates.org", expectedIDs[0]}, &metrics); err != nil {
|
||||
t.Error(err)
|
||||
} else if !reflect.DeepEqual(expectedMetrics, metrics) {
|
||||
t.Errorf("expecting: %+v, received reply: %s", expectedMetrics, metrics)
|
||||
}
|
||||
}
|
||||
|
||||
/*
|
||||
|
||||
func testV1STSProcessEvent(t *testing.T) {
|
||||
var reply string
|
||||
if err := stsV1Rpc.Call("StatSV1.ProcessEvent",
|
||||
@@ -203,7 +196,7 @@ func testV1STSProcessEvent(t *testing.T) {
|
||||
utils.MetaACD: "",
|
||||
}
|
||||
var metrics map[string]string
|
||||
if err := stsV1Rpc.Call("StatSV1.GetStringMetrics", "Stats1", &metrics); err != nil {
|
||||
if err := stsV1Rpc.Call("StatSV1.GetQueueStringMetrics", "Stats1", &metrics); err != nil {
|
||||
t.Error(err)
|
||||
} else if !reflect.DeepEqual(expectedMetrics, metrics) {
|
||||
t.Errorf("expecting: %+v, received reply: %s", expectedMetrics, metrics)
|
||||
@@ -337,11 +330,11 @@ func BenchmarkStatSV1SetEvent(b *testing.B) {
|
||||
}
|
||||
}
|
||||
|
||||
// BenchmarkStatSV1GetStringMetrics 20000 94607 ns/op
|
||||
func BenchmarkStatSV1GetStringMetrics(b *testing.B) {
|
||||
// BenchmarkStatSV1GetQueueStringMetrics 20000 94607 ns/op
|
||||
func BenchmarkStatSV1GetQueueStringMetrics(b *testing.B) {
|
||||
for i := 0; i < b.N; i++ {
|
||||
var metrics map[string]string
|
||||
if err := stsV1Rpc.Call("StatSV1.GetStringMetrics", "Stats1",
|
||||
if err := stsV1Rpc.Call("StatSV1.GetQueueStringMetrics", "Stats1",
|
||||
&metrics); err != nil {
|
||||
b.Error(err)
|
||||
}
|
||||
|
||||
@@ -1,2 +1,2 @@
|
||||
#Tenant[0],Id[1],FilterType[2],FilterFieldName[3],FilterFieldValues[4],ActivationInterval[5],QueueLength[6],TTL[7],Metrics[8],Blocker[9],Stored[10],Weight[11],Thresholds[12]
|
||||
cgrates.org,STATS_1,*string,Account,1001;1002,2014-07-29T15:00:00Z,100,1s,*asr;*acd;*acc,true,true,20,THRESH1;THRESH2
|
||||
cgrates.org,STATS_1,*string,Account,1001;1002,2014-07-29T15:00:00Z,100,1s,*asr;*acd,true,true,20,THRESH1;THRESH2
|
||||
|
@@ -307,3 +307,18 @@ func (sS *StatService) V1GetQueueFloatMetrics(args *utils.TenantID, reply *map[s
|
||||
*reply = metrics
|
||||
return
|
||||
}
|
||||
|
||||
// V1GetQueueIDs returns list of queueIDs registered for a tenant
|
||||
func (sS *StatService) V1GetQueueIDs(tenant string, qIDs *[]string) (err error) {
|
||||
prfx := utils.StatQueuePrefix + tenant + utils.CONCATENATED_KEY_SEP
|
||||
keys, err := sS.dm.DataDB().GetKeysForPrefix(prfx)
|
||||
if err != nil {
|
||||
return err
|
||||
}
|
||||
retIDs := make([]string, len(keys))
|
||||
for i, key := range keys {
|
||||
retIDs[i] = key[len(prfx):]
|
||||
}
|
||||
*qIDs = retIDs
|
||||
return
|
||||
}
|
||||
|
||||
@@ -34,6 +34,7 @@ type TpReader struct {
|
||||
tpid string
|
||||
timezone string
|
||||
dataStorage DataDB
|
||||
dm *DataManager
|
||||
lr LoadReader
|
||||
actions map[string][]*Action
|
||||
actionPlans map[string]*ActionPlan
|
||||
@@ -69,6 +70,7 @@ func NewTpReader(db DataDB, lr LoadReader, tpid, timezone string) *TpReader {
|
||||
tpid: tpid,
|
||||
timezone: timezone,
|
||||
dataStorage: db,
|
||||
dm: NewDataManager(db),
|
||||
lr: lr,
|
||||
}
|
||||
tpr.Init()
|
||||
@@ -2010,8 +2012,16 @@ func (tpr *TpReader) WriteToDatabase(flush, verbose, disable_reverse bool) (err
|
||||
log.Print("StatQueues:")
|
||||
}
|
||||
for _, sqTntID := range tpr.statQueues {
|
||||
if err = tpr.dataStorage.SetStoredStatQueue(&StoredStatQueue{Tenant: sqTntID.Tenant, ID: sqTntID.ID,
|
||||
SQMetrics: make(map[string][]byte)}); err != nil {
|
||||
sq := &StatQueue{Tenant: sqTntID.Tenant, ID: sqTntID.ID,
|
||||
SQMetrics: make(map[string]StatMetric)}
|
||||
for _, metricID := range tpr.sqProfiles[sqTntID.Tenant][sqTntID.ID].Metrics {
|
||||
if metric, err := NewStatMetric(metricID); err != nil {
|
||||
return err
|
||||
} else {
|
||||
sq.SQMetrics[metricID] = metric
|
||||
}
|
||||
}
|
||||
if err = tpr.dm.SetStatQueue(sq); err != nil {
|
||||
return
|
||||
}
|
||||
if verbose {
|
||||
|
||||
Reference in New Issue
Block a user