diff --git a/apier/v1/stats.go b/apier/v1/stats.go index 3c46e457f..bc857c176 100644 --- a/apier/v1/stats.go +++ b/apier/v1/stats.go @@ -100,6 +100,11 @@ func (stsv1 *StatSV1) Call(serviceMethod string, args interface{}, reply interfa return err } +// GetQueueIDs returns list of queueIDs registered for a tenant +func (stsv1 *StatSV1) GetQueueIDs(tenant string, qIDs *[]string) error { + return stsv1.sS.V1GetQueueIDs(tenant, qIDs) +} + // ProcessEvent returns processes a new Event func (stsv1 *StatSV1) ProcessEvent(ev *engine.StatEvent, reply *string) error { return stsv1.sS.V1ProcessEvent(ev, reply) diff --git a/apier/v1/stats_it_test.go b/apier/v1/stats_it_test.go index b32173f09..68d0adb30 100644 --- a/apier/v1/stats_it_test.go +++ b/apier/v1/stats_it_test.go @@ -19,7 +19,6 @@ along with this program. If not, see */ package v1 -/* import ( "math/rand" "net/rpc" @@ -43,16 +42,22 @@ var ( statsDelay int ) -var evs = []engine.StatsEvent{ - engine.StatsEvent{ - utils.ID: "event1", - utils.ANSWER_TIME: time.Date(2014, 7, 14, 14, 25, 0, 0, time.UTC).Local()}, - engine.StatsEvent{ - utils.ID: "event2", - utils.ANSWER_TIME: time.Date(2014, 7, 14, 14, 25, 0, 0, time.UTC).Local()}, - engine.StatsEvent{ - utils.ID: "event3", - utils.SETUP_TIME: time.Date(2014, 7, 14, 14, 25, 0, 0, time.UTC).Local()}, +var evs = []*engine.StatEvent{ + &engine.StatEvent{ + Tenant: "cgrates.org", + ID: "event1", + Fields: map[string]interface{}{ + utils.ANSWER_TIME: time.Date(2014, 7, 14, 14, 25, 0, 0, time.UTC).Local()}}, + &engine.StatEvent{ + Tenant: "cgrates.org", + ID: "event2", + Fields: map[string]interface{}{ + utils.ANSWER_TIME: time.Date(2014, 7, 14, 14, 25, 0, 0, time.UTC).Local()}}, + &engine.StatEvent{ + Tenant: "cgrates.org", + ID: "event3", + Fields: map[string]interface{}{ + utils.SETUP_TIME: time.Date(2014, 7, 14, 14, 25, 0, 0, time.UTC).Local()}}, } func init() { @@ -66,15 +71,15 @@ var sTestsStatSV1 = []func(t *testing.T){ testV1STSRpcConn, testV1STSFromFolder, testV1STSGetStats, - testV1STSProcessEvent, - testV1STSGetStatQueueProfileBeforeSet, - testV1STSSetStatQueueProfile, - testV1STSGetStatQueueProfileAfterSet, - testV1STSUpdateStatQueueProfile, - testV1STSGetStatQueueProfileAfterUpdate, - testV1STSRemoveStatQueueProfile, - testV1STSGetStatQueueProfileAfterRemove, - testV1STSStopEngine, + //testV1STSProcessEvent, + //testV1STSGetStatQueueProfileBeforeSet, + //testV1STSSetStatQueueProfile, + //testV1STSGetStatQueueProfileAfterSet, + //testV1STSUpdateStatQueueProfile, + //testV1STSGetStatQueueProfileAfterUpdate, + //testV1STSRemoveStatQueueProfile, + //testV1STSGetStatQueueProfileAfterRemove, + //testV1STSStopEngine, } //Test start here @@ -128,47 +133,35 @@ func testV1STSRpcConn(t *testing.T) { func testV1STSFromFolder(t *testing.T) { var reply string - time.Sleep(time.Duration(2000) * time.Millisecond) attrs := &utils.AttrLoadTpFromFolder{FolderPath: path.Join(*dataDir, "tariffplans", "tutorial")} if err := stsV1Rpc.Call("ApierV1.LoadTariffPlanFromFolder", attrs, &reply); err != nil { t.Error(err) } - time.Sleep(time.Duration(1000) * time.Millisecond) } func testV1STSGetStats(t *testing.T) { var reply []string - // first attempt should be empty since there is no queue in cache yet - if err := stsV1Rpc.Call("StatSV1.GetQueueIDs", struct{}{}, &reply); err == nil || err.Error() != utils.ErrNotFound.Error() { - t.Error(err) - } - var metrics map[string]string - if err := stsV1Rpc.Call("StatSV1.GetStringMetrics", "Stats1", &metrics); err == nil || err.Error() != utils.ErrNotFound.Error() { - t.Error(err) - } - var replyStr string - if err := stsV1Rpc.Call("StatSV1.LoadQueues", nil, &replyStr); err != nil { - t.Error(err) - } else if replyStr != utils.OK { - t.Errorf("reply received: %s", replyStr) - } - expectedIDs := []string{"Stats1"} - if err := stsV1Rpc.Call("StatSV1.GetQueueIDs", struct{}{}, &reply); err != nil { + expectedIDs := []string{"STATS_1"} + if err := stsV1Rpc.Call("StatSV1.GetQueueIDs", "cgrates.org", &reply); err != nil { t.Error(err) } else if !reflect.DeepEqual(expectedIDs, reply) { t.Errorf("expecting: %+v, received reply: %s", expectedIDs, reply) } + var metrics map[string]string expectedMetrics := map[string]string{ utils.MetaASR: utils.NOT_AVAILABLE, utils.MetaACD: "", } - if err := stsV1Rpc.Call("StatSV1.GetStringMetrics", "Stats1", &metrics); err != nil { + if err := stsV1Rpc.Call("StatSV1.GetQueueStringMetrics", + &utils.TenantID{"cgrates.org", expectedIDs[0]}, &metrics); err != nil { t.Error(err) } else if !reflect.DeepEqual(expectedMetrics, metrics) { t.Errorf("expecting: %+v, received reply: %s", expectedMetrics, metrics) } } +/* + func testV1STSProcessEvent(t *testing.T) { var reply string if err := stsV1Rpc.Call("StatSV1.ProcessEvent", @@ -203,7 +196,7 @@ func testV1STSProcessEvent(t *testing.T) { utils.MetaACD: "", } var metrics map[string]string - if err := stsV1Rpc.Call("StatSV1.GetStringMetrics", "Stats1", &metrics); err != nil { + if err := stsV1Rpc.Call("StatSV1.GetQueueStringMetrics", "Stats1", &metrics); err != nil { t.Error(err) } else if !reflect.DeepEqual(expectedMetrics, metrics) { t.Errorf("expecting: %+v, received reply: %s", expectedMetrics, metrics) @@ -337,11 +330,11 @@ func BenchmarkStatSV1SetEvent(b *testing.B) { } } -// BenchmarkStatSV1GetStringMetrics 20000 94607 ns/op -func BenchmarkStatSV1GetStringMetrics(b *testing.B) { +// BenchmarkStatSV1GetQueueStringMetrics 20000 94607 ns/op +func BenchmarkStatSV1GetQueueStringMetrics(b *testing.B) { for i := 0; i < b.N; i++ { var metrics map[string]string - if err := stsV1Rpc.Call("StatSV1.GetStringMetrics", "Stats1", + if err := stsV1Rpc.Call("StatSV1.GetQueueStringMetrics", "Stats1", &metrics); err != nil { b.Error(err) } diff --git a/data/tariffplans/tutorial/Stats.csv b/data/tariffplans/tutorial/Stats.csv index 6832735d3..b3122b4db 100755 --- a/data/tariffplans/tutorial/Stats.csv +++ b/data/tariffplans/tutorial/Stats.csv @@ -1,2 +1,2 @@ #Tenant[0],Id[1],FilterType[2],FilterFieldName[3],FilterFieldValues[4],ActivationInterval[5],QueueLength[6],TTL[7],Metrics[8],Blocker[9],Stored[10],Weight[11],Thresholds[12] -cgrates.org,STATS_1,*string,Account,1001;1002,2014-07-29T15:00:00Z,100,1s,*asr;*acd;*acc,true,true,20,THRESH1;THRESH2 \ No newline at end of file +cgrates.org,STATS_1,*string,Account,1001;1002,2014-07-29T15:00:00Z,100,1s,*asr;*acd,true,true,20,THRESH1;THRESH2 \ No newline at end of file diff --git a/engine/stats.go b/engine/stats.go index 1d878755a..00418245e 100644 --- a/engine/stats.go +++ b/engine/stats.go @@ -307,3 +307,18 @@ func (sS *StatService) V1GetQueueFloatMetrics(args *utils.TenantID, reply *map[s *reply = metrics return } + +// V1GetQueueIDs returns list of queueIDs registered for a tenant +func (sS *StatService) V1GetQueueIDs(tenant string, qIDs *[]string) (err error) { + prfx := utils.StatQueuePrefix + tenant + utils.CONCATENATED_KEY_SEP + keys, err := sS.dm.DataDB().GetKeysForPrefix(prfx) + if err != nil { + return err + } + retIDs := make([]string, len(keys)) + for i, key := range keys { + retIDs[i] = key[len(prfx):] + } + *qIDs = retIDs + return +} diff --git a/engine/tp_reader.go b/engine/tp_reader.go index 9c2073ce6..f5040bafc 100755 --- a/engine/tp_reader.go +++ b/engine/tp_reader.go @@ -34,6 +34,7 @@ type TpReader struct { tpid string timezone string dataStorage DataDB + dm *DataManager lr LoadReader actions map[string][]*Action actionPlans map[string]*ActionPlan @@ -69,6 +70,7 @@ func NewTpReader(db DataDB, lr LoadReader, tpid, timezone string) *TpReader { tpid: tpid, timezone: timezone, dataStorage: db, + dm: NewDataManager(db), lr: lr, } tpr.Init() @@ -2010,8 +2012,16 @@ func (tpr *TpReader) WriteToDatabase(flush, verbose, disable_reverse bool) (err log.Print("StatQueues:") } for _, sqTntID := range tpr.statQueues { - if err = tpr.dataStorage.SetStoredStatQueue(&StoredStatQueue{Tenant: sqTntID.Tenant, ID: sqTntID.ID, - SQMetrics: make(map[string][]byte)}); err != nil { + sq := &StatQueue{Tenant: sqTntID.Tenant, ID: sqTntID.ID, + SQMetrics: make(map[string]StatMetric)} + for _, metricID := range tpr.sqProfiles[sqTntID.Tenant][sqTntID.ID].Metrics { + if metric, err := NewStatMetric(metricID); err != nil { + return err + } else { + sq.SQMetrics[metricID] = metric + } + } + if err = tpr.dm.SetStatQueue(sq); err != nil { return } if verbose {