diff --git a/apier/v1/stats.go b/apier/v1/stats.go
index 4c5a1437e..f83ba54c0 100644
--- a/apier/v1/stats.go
+++ b/apier/v1/stats.go
@@ -62,7 +62,23 @@ func (stsv1 *StatSV1) Call(serviceMethod string, args interface{}, reply interfa
return err
}
-// GetLimitsForEvent returns ResourceLimits matching a specific event
+// ProcessEvent returns processes a new Event
func (stsv1 *StatSV1) ProcessEvent(ev engine.StatsEvent, reply *string) error {
return stsv1.sts.V1ProcessEvent(ev, reply)
}
+
+// GetQueueIDs returns the list of queues IDs in the system
+func (stsv1 *StatSV1) GetQueueIDs(ignored struct{}, reply *[]string) (err error) {
+ return stsv1.sts.V1GetQueueIDs(ignored, reply)
+}
+
+// GetStatMetrics returns the metrics for a queueID
+func (stsv1 *StatSV1) GetStatMetrics(queueID string, reply *map[string]string) (err error) {
+ return stsv1.sts.V1GetStatMetrics(queueID, reply)
+}
+
+// LoadQueues loads from dataDB into statsService the queueIDs specified
+// loads all when qIDs is nil
+func (stsv1 *StatSV1) LoadQueues(args stats.ArgsLoadQueues, reply *string) (err error) {
+ return stsv1.sts.V1LoadQueues(args, reply)
+}
diff --git a/apier/v1/stats_it_test.go b/apier/v1/stats_it_test.go
new file mode 100644
index 000000000..1e7f62f0f
--- /dev/null
+++ b/apier/v1/stats_it_test.go
@@ -0,0 +1,153 @@
+// +build integration
+
+/*
+Real-time Online/Offline Charging System (OCS) for Telecom & ISP environments
+Copyright (C) ITsysCOM GmbH
+
+This program is free software: you can redistribute it and/or modify
+it under the terms of the GNU General Public License as published by
+the Free Software Foundation, either version 3 of the License, or
+(at your option) any later version.
+
+This program is distributed in the hope that it will be useful,
+but WITHOUT ANY WARRANTY; without even the implied warranty of
+MERCHANTABILITY or FITNESS FOR A PARTICULAR PURPOSE. See the
+GNU General Public License for more details.
+
+You should have received a copy of the GNU General Public License
+along with this program. If not, see
+*/
+package v1
+
+import (
+ "net/rpc"
+ "net/rpc/jsonrpc"
+ "path"
+ "reflect"
+ "testing"
+ "time"
+
+ "github.com/cgrates/cgrates/config"
+ "github.com/cgrates/cgrates/engine"
+ "github.com/cgrates/cgrates/utils"
+)
+
+var (
+ stsV1CfgPath string
+ stsV1Cfg *config.CGRConfig
+ stsV1Rpc *rpc.Client
+)
+
+func TestStatSV1LoadConfig(t *testing.T) {
+ var err error
+ stsV1CfgPath = path.Join(*dataDir, "conf", "samples", "stats")
+ if stsV1Cfg, err = config.NewCGRConfigFromFolder(stsV1CfgPath); err != nil {
+ t.Error(err)
+ }
+}
+
+func TestStatSV1InitDataDb(t *testing.T) {
+ if err := engine.InitDataDb(stsV1Cfg); err != nil {
+ t.Fatal(err)
+ }
+}
+
+/*
+func TestStatSV1StartEngine(t *testing.T) {
+ if _, err := engine.StopStartEngine(stsV1CfgPath, 1000); err != nil {
+ t.Fatal(err)
+ }
+}
+*/
+
+func TestStatSV1RpcConn(t *testing.T) {
+ var err error
+ stsV1Rpc, err = jsonrpc.Dial("tcp", stsV1Cfg.RPCJSONListen) // We connect over JSON so we can also troubleshoot if needed
+ if err != nil {
+ t.Fatal("Could not connect to rater: ", err.Error())
+ }
+}
+
+func TestStatSV1TPFromFolder(t *testing.T) {
+ var reply string
+ time.Sleep(time.Duration(2000) * time.Millisecond)
+ attrs := &utils.AttrLoadTpFromFolder{FolderPath: path.Join(*dataDir, "tariffplans", "tutorial")}
+ if err := stsV1Rpc.Call("ApierV1.LoadTariffPlanFromFolder", attrs, &reply); err != nil {
+ t.Error(err)
+ }
+ time.Sleep(time.Duration(1000) * time.Millisecond)
+}
+
+func TestStatSV1GetStats(t *testing.T) {
+ var reply []string
+ // first attempt should be empty since there is no queue in cache yet
+ if err := stsV1Rpc.Call("StatSV1.GetQueueIDs", struct{}{}, &reply); err == nil || err.Error() != utils.ErrNotFound.Error() {
+ t.Error(err)
+ }
+ var metrics map[string]string
+ if err := stsV1Rpc.Call("StatSV1.GetStatMetrics", "Stats1", &metrics); err == nil || err.Error() != utils.ErrNotFound.Error() {
+ t.Error(err)
+ }
+ var replyStr string
+ if err := stsV1Rpc.Call("StatSV1.LoadQueues", nil, &replyStr); err != nil {
+ t.Error(err)
+ } else if replyStr != utils.OK {
+ t.Errorf("reply received: %s", replyStr)
+ }
+ expectedIDs := []string{"Stats1"}
+ if err := stsV1Rpc.Call("StatSV1.GetQueueIDs", struct{}{}, &reply); err != nil {
+ t.Error(err)
+ } else if !reflect.DeepEqual(expectedIDs, reply) {
+ t.Errorf("expecting: %+v, received reply: %s", expectedIDs, reply)
+ }
+ expectedMetrics := map[string]string{}
+ if err := stsV1Rpc.Call("StatSV1.GetStatMetrics", "Stats1", &metrics); err != nil {
+ t.Error(err)
+ } else if !reflect.DeepEqual(expectedMetrics, metrics) {
+ t.Errorf("expecting: %+v, received reply: %s", expectedMetrics, metrics)
+ }
+}
+
+func TestStatSV1ProcessEvent(t *testing.T) {
+ var reply string
+ if err := stsV1Rpc.Call("StatSV1.ProcessEvent",
+ engine.StatsEvent{
+ utils.ID: "event1",
+ utils.ANSWER_TIME: time.Date(2014, 7, 14, 14, 25, 0, 0, time.UTC)},
+ &reply); err != nil {
+ t.Error(err)
+ } else if reply != utils.OK {
+ t.Errorf("received reply: %s", reply)
+ }
+ if err := stsV1Rpc.Call("StatSV1.ProcessEvent",
+ engine.StatsEvent{
+ utils.ID: "event2",
+ utils.ANSWER_TIME: time.Date(2014, 7, 14, 14, 25, 0, 0, time.UTC)},
+ &reply); err != nil {
+ t.Error(err)
+ } else if reply != utils.OK {
+ t.Errorf("received reply: %s", reply)
+ }
+ if err := stsV1Rpc.Call("StatSV1.ProcessEvent",
+ map[string]interface{}{
+ utils.ID: "event3",
+ utils.SETUP_TIME: time.Date(2014, 7, 14, 14, 25, 0, 0, time.UTC)},
+ &reply); err != nil {
+ t.Error(err)
+ } else if reply != utils.OK {
+ t.Errorf("received reply: %s", reply)
+ }
+ expectedMetrics := map[string]string{}
+ var metrics map[string]string
+ if err := stsV1Rpc.Call("StatSV1.GetStatMetrics", "Stats1", &metrics); err != nil {
+ t.Error(err)
+ } else if !reflect.DeepEqual(expectedMetrics, metrics) {
+ t.Errorf("expecting: %+v, received reply: %s", expectedMetrics, metrics)
+ }
+}
+
+func TestStatSV1StopEngine(t *testing.T) {
+ if err := engine.KillEngine(100); err != nil {
+ t.Error(err)
+ }
+}
diff --git a/general_tests/tutorial_it_test.go b/general_tests/tutorial_it_test.go
index b9e866877..eda77255f 100644
--- a/general_tests/tutorial_it_test.go
+++ b/general_tests/tutorial_it_test.go
@@ -54,7 +54,7 @@ func TestTutITInitCfg(t *testing.T) {
config.SetCgrConfig(tutFsLocalCfg)
}
-// Remove data in both rating and accounting db
+// Remove data in dataDB
func TestTutITResetDataDb(t *testing.T) {
if err := engine.InitDataDb(tutFsLocalCfg); err != nil {
t.Fatal(err)
diff --git a/stats/service.go b/stats/service.go
index 9eebe388d..b78777527 100644
--- a/stats/service.go
+++ b/stats/service.go
@@ -40,24 +40,17 @@ func NewStatService(dataDB engine.DataDB, ms engine.Marshaler, storeInterval tim
if err != nil {
return nil, err
}
- ss.stInsts = make(StatsInstances, len(sqPrfxs))
- for i, prfx := range sqPrfxs {
- sq, err := dataDB.GetStatsQueue(prfx[len(utils.StatsQueuePrefix):], false, utils.NonTransactional)
- if err != nil {
- return nil, err
- }
- var sqSM *engine.SQStoredMetrics
- if sq.Store {
- if sqSM, err = dataDB.GetSQStoredMetrics(sq.ID); err != nil && err != utils.ErrNotFound {
- return nil, err
- }
- }
- if ss.stInsts[i], err = NewStatsInstance(ss.evCache, ss.ms, sq, sqSM); err != nil {
+ ss.queuesCache = make(map[string]*StatsInstance)
+ ss.queues = make(StatsInstances, 0)
+ for _, prfx := range sqPrfxs {
+ if q, err := ss.loadQueue(prfx[len(utils.StatsQueuePrefix):]); err != nil {
return nil, err
+ } else {
+ ss.setQueue(q)
}
}
- ss.stInsts.Sort()
- go ss.dumpStoredMetrics()
+ ss.queues.Sort()
+ go ss.dumpStoredMetrics() // start dumpStoredMetrics loop
return
}
@@ -68,8 +61,10 @@ type StatService struct {
ms engine.Marshaler
storeInterval time.Duration
stopStoring chan struct{}
- evCache *StatsEventCache // so we can pass it to queues
- stInsts StatsInstances // ordered list of StatsQueues
+ evCache *StatsEventCache // so we can pass it to queues
+ queuesCache map[string]*StatsInstance // unordered db of StatsQueues, used for fast queries
+ queues StatsInstances // ordered list of StatsQueues
+
}
// ListenAndServe loops keeps the service alive
@@ -80,7 +75,7 @@ func (ss *StatService) ListenAndServe(exitChan chan bool) error {
}
// Called to shutdown the service
-// ToDo: improve with context, ie, following http implementation
+// ToDo: improve with context, ie following http implementation
func (ss *StatService) Shutdown() error {
utils.Logger.Info(" service shutdown initialized")
close(ss.stopStoring)
@@ -89,9 +84,38 @@ func (ss *StatService) Shutdown() error {
return nil
}
+// setQueue adds or modifies a queue into cache
+// sort will reorder the ss.queues
+func (ss *StatService) loadQueue(qID string) (q *StatsInstance, err error) {
+ sq, err := ss.dataDB.GetStatsQueue(qID, false, utils.NonTransactional)
+ if err != nil {
+ return nil, err
+ }
+ var sqSM *engine.SQStoredMetrics
+ if sq.Store {
+ if sqSM, err = ss.dataDB.GetSQStoredMetrics(sq.ID); err != nil && err != utils.ErrNotFound {
+ return nil, err
+ }
+ }
+ return NewStatsInstance(ss.evCache, ss.ms, sq, sqSM)
+}
+
+func (ss *StatService) setQueue(q *StatsInstance) {
+ ss.queuesCache[q.cfg.ID] = q
+ ss.queues = append(ss.queues, q)
+}
+
+// remQueue will remove a queue based on it's ID
+func (ss *StatService) remQueue(qID string) (si *StatsInstance) {
+ si = ss.queuesCache[qID]
+ ss.queues.remWithID(qID)
+ delete(ss.queuesCache, qID)
+ return
+}
+
// store stores the necessary storedMetrics to dataDB
func (ss *StatService) storeMetrics() {
- for _, si := range ss.stInsts {
+ for _, si := range ss.queues {
if !si.cfg.Store || !si.dirty { // no need to save
continue
}
@@ -126,7 +150,7 @@ func (ss *StatService) processEvent(ev engine.StatsEvent) (err error) {
if evStatsID == "" { // ID is mandatory
return errors.New("missing ID field")
}
- for _, stInst := range ss.stInsts {
+ for _, stInst := range ss.queues {
if err := stInst.ProcessEvent(ev); err != nil {
utils.Logger.Warning(
fmt.Sprintf(" QueueID: %s, ignoring event with ID: %s, error: %s",
@@ -143,3 +167,71 @@ func (ss *StatService) V1ProcessEvent(ev engine.StatsEvent, reply *string) (err
}
return
}
+
+func (ss *StatService) V1GetQueueIDs(ignored struct{}, reply *[]string) (err error) {
+ if len(ss.queuesCache) == 0 {
+ return utils.ErrNotFound
+ }
+ for k := range ss.queuesCache {
+ *reply = append(*reply, k)
+ }
+ return
+}
+
+func (ss *StatService) V1GetStatMetrics(queueID string, reply *map[string]string) (err error) {
+ sq, has := ss.queuesCache[queueID]
+ if !has {
+ return utils.ErrNotFound
+ }
+ metrics := make(map[string]string, len(sq.sqMetrics))
+ for metricID, metric := range sq.sqMetrics {
+ metrics[metricID] = metric.GetStringValue("")
+ }
+ *reply = metrics
+ return
+}
+
+type ArgsLoadQueues struct {
+ QueueIDs *[]string
+}
+
+// V1LoadQueues loads the queues specified by qIDs into the service
+// loads all if qIDs is nil
+func (ss *StatService) V1LoadQueues(args ArgsLoadQueues, reply *string) (err error) {
+ qIDs := args.QueueIDs
+ if qIDs == nil {
+ sqPrfxs, err := ss.dataDB.GetKeysForPrefix(utils.StatsQueuePrefix)
+ if err != nil {
+ return err
+ }
+ queueIDs := make([]string, len(sqPrfxs))
+ for i, prfx := range sqPrfxs {
+ queueIDs[i] = prfx[len(utils.StatsQueuePrefix):]
+ }
+ if len(queueIDs) != 0 {
+ qIDs = &queueIDs
+ }
+ }
+ if qIDs == nil || len(*qIDs) == 0 {
+ return utils.ErrNotFound
+ }
+ var sQs []*StatsInstance // cache here so we lock only later when data available
+ for _, qID := range *qIDs {
+ if _, hasPrev := ss.queuesCache[qID]; hasPrev {
+ continue // don't overwrite previous, could be extended in the future by carefully checking cached events
+ }
+ if q, err := ss.loadQueue(qID); err != nil {
+ return err
+ } else {
+ sQs = append(sQs, q)
+ }
+ }
+ ss.Lock()
+ for _, q := range sQs {
+ ss.setQueue(q)
+ }
+ ss.queues.Sort()
+ ss.Unlock()
+ *reply = utils.OK
+ return
+}
diff --git a/stats/sinstance.go b/stats/sinstance.go
index 55253f1d7..31cb5104e 100644
--- a/stats/sinstance.go
+++ b/stats/sinstance.go
@@ -35,6 +35,18 @@ func (sis StatsInstances) Sort() {
sort.Slice(sis, func(i, j int) bool { return sis[i].cfg.Weight > sis[j].cfg.Weight })
}
+// remWithID removes the queue with ID from slice
+func (sis StatsInstances) remWithID(qID string) {
+ for i, q := range sis {
+ if q.cfg.ID == qID {
+ copy(sis[i:], sis[i+1:])
+ sis[len(sis)-1] = nil
+ sis = sis[:len(sis)-1]
+ break // there can be only one item with ID
+ }
+ }
+}
+
// NewStatsInstance instantiates a StatsInstance
func NewStatsInstance(sec *StatsEventCache, ms engine.Marshaler,
sqCfg *engine.StatsQueue, sqSM *engine.SQStoredMetrics) (si *StatsInstance, err error) {