mirror of
https://github.com/cgrates/cgrates.git
synced 2026-02-11 18:16:24 +05:00
StatsV1.GetQueueIDs, StatsV1.GetStatMetrics, StatsV1.LoadQueues with tests
This commit is contained in:
@@ -62,7 +62,23 @@ func (stsv1 *StatSV1) Call(serviceMethod string, args interface{}, reply interfa
|
||||
return err
|
||||
}
|
||||
|
||||
// GetLimitsForEvent returns ResourceLimits matching a specific event
|
||||
// ProcessEvent returns processes a new Event
|
||||
func (stsv1 *StatSV1) ProcessEvent(ev engine.StatsEvent, reply *string) error {
|
||||
return stsv1.sts.V1ProcessEvent(ev, reply)
|
||||
}
|
||||
|
||||
// GetQueueIDs returns the list of queues IDs in the system
|
||||
func (stsv1 *StatSV1) GetQueueIDs(ignored struct{}, reply *[]string) (err error) {
|
||||
return stsv1.sts.V1GetQueueIDs(ignored, reply)
|
||||
}
|
||||
|
||||
// GetStatMetrics returns the metrics for a queueID
|
||||
func (stsv1 *StatSV1) GetStatMetrics(queueID string, reply *map[string]string) (err error) {
|
||||
return stsv1.sts.V1GetStatMetrics(queueID, reply)
|
||||
}
|
||||
|
||||
// LoadQueues loads from dataDB into statsService the queueIDs specified
|
||||
// loads all when qIDs is nil
|
||||
func (stsv1 *StatSV1) LoadQueues(args stats.ArgsLoadQueues, reply *string) (err error) {
|
||||
return stsv1.sts.V1LoadQueues(args, reply)
|
||||
}
|
||||
|
||||
153
apier/v1/stats_it_test.go
Normal file
153
apier/v1/stats_it_test.go
Normal file
@@ -0,0 +1,153 @@
|
||||
// +build integration
|
||||
|
||||
/*
|
||||
Real-time Online/Offline Charging System (OCS) for Telecom & ISP environments
|
||||
Copyright (C) ITsysCOM GmbH
|
||||
|
||||
This program is free software: you can redistribute it and/or modify
|
||||
it under the terms of the GNU General Public License as published by
|
||||
the Free Software Foundation, either version 3 of the License, or
|
||||
(at your option) any later version.
|
||||
|
||||
This program is distributed in the hope that it will be useful,
|
||||
but WITHOUT ANY WARRANTY; without even the implied warranty of
|
||||
MERCHANTABILITY or FITNESS FOR A PARTICULAR PURPOSE. See the
|
||||
GNU General Public License for more details.
|
||||
|
||||
You should have received a copy of the GNU General Public License
|
||||
along with this program. If not, see <http://www.gnu.org/licenses/>
|
||||
*/
|
||||
package v1
|
||||
|
||||
import (
|
||||
"net/rpc"
|
||||
"net/rpc/jsonrpc"
|
||||
"path"
|
||||
"reflect"
|
||||
"testing"
|
||||
"time"
|
||||
|
||||
"github.com/cgrates/cgrates/config"
|
||||
"github.com/cgrates/cgrates/engine"
|
||||
"github.com/cgrates/cgrates/utils"
|
||||
)
|
||||
|
||||
var (
|
||||
stsV1CfgPath string
|
||||
stsV1Cfg *config.CGRConfig
|
||||
stsV1Rpc *rpc.Client
|
||||
)
|
||||
|
||||
func TestStatSV1LoadConfig(t *testing.T) {
|
||||
var err error
|
||||
stsV1CfgPath = path.Join(*dataDir, "conf", "samples", "stats")
|
||||
if stsV1Cfg, err = config.NewCGRConfigFromFolder(stsV1CfgPath); err != nil {
|
||||
t.Error(err)
|
||||
}
|
||||
}
|
||||
|
||||
func TestStatSV1InitDataDb(t *testing.T) {
|
||||
if err := engine.InitDataDb(stsV1Cfg); err != nil {
|
||||
t.Fatal(err)
|
||||
}
|
||||
}
|
||||
|
||||
/*
|
||||
func TestStatSV1StartEngine(t *testing.T) {
|
||||
if _, err := engine.StopStartEngine(stsV1CfgPath, 1000); err != nil {
|
||||
t.Fatal(err)
|
||||
}
|
||||
}
|
||||
*/
|
||||
|
||||
func TestStatSV1RpcConn(t *testing.T) {
|
||||
var err error
|
||||
stsV1Rpc, err = jsonrpc.Dial("tcp", stsV1Cfg.RPCJSONListen) // We connect over JSON so we can also troubleshoot if needed
|
||||
if err != nil {
|
||||
t.Fatal("Could not connect to rater: ", err.Error())
|
||||
}
|
||||
}
|
||||
|
||||
func TestStatSV1TPFromFolder(t *testing.T) {
|
||||
var reply string
|
||||
time.Sleep(time.Duration(2000) * time.Millisecond)
|
||||
attrs := &utils.AttrLoadTpFromFolder{FolderPath: path.Join(*dataDir, "tariffplans", "tutorial")}
|
||||
if err := stsV1Rpc.Call("ApierV1.LoadTariffPlanFromFolder", attrs, &reply); err != nil {
|
||||
t.Error(err)
|
||||
}
|
||||
time.Sleep(time.Duration(1000) * time.Millisecond)
|
||||
}
|
||||
|
||||
func TestStatSV1GetStats(t *testing.T) {
|
||||
var reply []string
|
||||
// first attempt should be empty since there is no queue in cache yet
|
||||
if err := stsV1Rpc.Call("StatSV1.GetQueueIDs", struct{}{}, &reply); err == nil || err.Error() != utils.ErrNotFound.Error() {
|
||||
t.Error(err)
|
||||
}
|
||||
var metrics map[string]string
|
||||
if err := stsV1Rpc.Call("StatSV1.GetStatMetrics", "Stats1", &metrics); err == nil || err.Error() != utils.ErrNotFound.Error() {
|
||||
t.Error(err)
|
||||
}
|
||||
var replyStr string
|
||||
if err := stsV1Rpc.Call("StatSV1.LoadQueues", nil, &replyStr); err != nil {
|
||||
t.Error(err)
|
||||
} else if replyStr != utils.OK {
|
||||
t.Errorf("reply received: %s", replyStr)
|
||||
}
|
||||
expectedIDs := []string{"Stats1"}
|
||||
if err := stsV1Rpc.Call("StatSV1.GetQueueIDs", struct{}{}, &reply); err != nil {
|
||||
t.Error(err)
|
||||
} else if !reflect.DeepEqual(expectedIDs, reply) {
|
||||
t.Errorf("expecting: %+v, received reply: %s", expectedIDs, reply)
|
||||
}
|
||||
expectedMetrics := map[string]string{}
|
||||
if err := stsV1Rpc.Call("StatSV1.GetStatMetrics", "Stats1", &metrics); err != nil {
|
||||
t.Error(err)
|
||||
} else if !reflect.DeepEqual(expectedMetrics, metrics) {
|
||||
t.Errorf("expecting: %+v, received reply: %s", expectedMetrics, metrics)
|
||||
}
|
||||
}
|
||||
|
||||
func TestStatSV1ProcessEvent(t *testing.T) {
|
||||
var reply string
|
||||
if err := stsV1Rpc.Call("StatSV1.ProcessEvent",
|
||||
engine.StatsEvent{
|
||||
utils.ID: "event1",
|
||||
utils.ANSWER_TIME: time.Date(2014, 7, 14, 14, 25, 0, 0, time.UTC)},
|
||||
&reply); err != nil {
|
||||
t.Error(err)
|
||||
} else if reply != utils.OK {
|
||||
t.Errorf("received reply: %s", reply)
|
||||
}
|
||||
if err := stsV1Rpc.Call("StatSV1.ProcessEvent",
|
||||
engine.StatsEvent{
|
||||
utils.ID: "event2",
|
||||
utils.ANSWER_TIME: time.Date(2014, 7, 14, 14, 25, 0, 0, time.UTC)},
|
||||
&reply); err != nil {
|
||||
t.Error(err)
|
||||
} else if reply != utils.OK {
|
||||
t.Errorf("received reply: %s", reply)
|
||||
}
|
||||
if err := stsV1Rpc.Call("StatSV1.ProcessEvent",
|
||||
map[string]interface{}{
|
||||
utils.ID: "event3",
|
||||
utils.SETUP_TIME: time.Date(2014, 7, 14, 14, 25, 0, 0, time.UTC)},
|
||||
&reply); err != nil {
|
||||
t.Error(err)
|
||||
} else if reply != utils.OK {
|
||||
t.Errorf("received reply: %s", reply)
|
||||
}
|
||||
expectedMetrics := map[string]string{}
|
||||
var metrics map[string]string
|
||||
if err := stsV1Rpc.Call("StatSV1.GetStatMetrics", "Stats1", &metrics); err != nil {
|
||||
t.Error(err)
|
||||
} else if !reflect.DeepEqual(expectedMetrics, metrics) {
|
||||
t.Errorf("expecting: %+v, received reply: %s", expectedMetrics, metrics)
|
||||
}
|
||||
}
|
||||
|
||||
func TestStatSV1StopEngine(t *testing.T) {
|
||||
if err := engine.KillEngine(100); err != nil {
|
||||
t.Error(err)
|
||||
}
|
||||
}
|
||||
@@ -54,7 +54,7 @@ func TestTutITInitCfg(t *testing.T) {
|
||||
config.SetCgrConfig(tutFsLocalCfg)
|
||||
}
|
||||
|
||||
// Remove data in both rating and accounting db
|
||||
// Remove data in dataDB
|
||||
func TestTutITResetDataDb(t *testing.T) {
|
||||
if err := engine.InitDataDb(tutFsLocalCfg); err != nil {
|
||||
t.Fatal(err)
|
||||
|
||||
132
stats/service.go
132
stats/service.go
@@ -40,24 +40,17 @@ func NewStatService(dataDB engine.DataDB, ms engine.Marshaler, storeInterval tim
|
||||
if err != nil {
|
||||
return nil, err
|
||||
}
|
||||
ss.stInsts = make(StatsInstances, len(sqPrfxs))
|
||||
for i, prfx := range sqPrfxs {
|
||||
sq, err := dataDB.GetStatsQueue(prfx[len(utils.StatsQueuePrefix):], false, utils.NonTransactional)
|
||||
if err != nil {
|
||||
return nil, err
|
||||
}
|
||||
var sqSM *engine.SQStoredMetrics
|
||||
if sq.Store {
|
||||
if sqSM, err = dataDB.GetSQStoredMetrics(sq.ID); err != nil && err != utils.ErrNotFound {
|
||||
return nil, err
|
||||
}
|
||||
}
|
||||
if ss.stInsts[i], err = NewStatsInstance(ss.evCache, ss.ms, sq, sqSM); err != nil {
|
||||
ss.queuesCache = make(map[string]*StatsInstance)
|
||||
ss.queues = make(StatsInstances, 0)
|
||||
for _, prfx := range sqPrfxs {
|
||||
if q, err := ss.loadQueue(prfx[len(utils.StatsQueuePrefix):]); err != nil {
|
||||
return nil, err
|
||||
} else {
|
||||
ss.setQueue(q)
|
||||
}
|
||||
}
|
||||
ss.stInsts.Sort()
|
||||
go ss.dumpStoredMetrics()
|
||||
ss.queues.Sort()
|
||||
go ss.dumpStoredMetrics() // start dumpStoredMetrics loop
|
||||
return
|
||||
}
|
||||
|
||||
@@ -68,8 +61,10 @@ type StatService struct {
|
||||
ms engine.Marshaler
|
||||
storeInterval time.Duration
|
||||
stopStoring chan struct{}
|
||||
evCache *StatsEventCache // so we can pass it to queues
|
||||
stInsts StatsInstances // ordered list of StatsQueues
|
||||
evCache *StatsEventCache // so we can pass it to queues
|
||||
queuesCache map[string]*StatsInstance // unordered db of StatsQueues, used for fast queries
|
||||
queues StatsInstances // ordered list of StatsQueues
|
||||
|
||||
}
|
||||
|
||||
// ListenAndServe loops keeps the service alive
|
||||
@@ -80,7 +75,7 @@ func (ss *StatService) ListenAndServe(exitChan chan bool) error {
|
||||
}
|
||||
|
||||
// Called to shutdown the service
|
||||
// ToDo: improve with context, ie, following http implementation
|
||||
// ToDo: improve with context, ie following http implementation
|
||||
func (ss *StatService) Shutdown() error {
|
||||
utils.Logger.Info("<StatS> service shutdown initialized")
|
||||
close(ss.stopStoring)
|
||||
@@ -89,9 +84,38 @@ func (ss *StatService) Shutdown() error {
|
||||
return nil
|
||||
}
|
||||
|
||||
// setQueue adds or modifies a queue into cache
|
||||
// sort will reorder the ss.queues
|
||||
func (ss *StatService) loadQueue(qID string) (q *StatsInstance, err error) {
|
||||
sq, err := ss.dataDB.GetStatsQueue(qID, false, utils.NonTransactional)
|
||||
if err != nil {
|
||||
return nil, err
|
||||
}
|
||||
var sqSM *engine.SQStoredMetrics
|
||||
if sq.Store {
|
||||
if sqSM, err = ss.dataDB.GetSQStoredMetrics(sq.ID); err != nil && err != utils.ErrNotFound {
|
||||
return nil, err
|
||||
}
|
||||
}
|
||||
return NewStatsInstance(ss.evCache, ss.ms, sq, sqSM)
|
||||
}
|
||||
|
||||
func (ss *StatService) setQueue(q *StatsInstance) {
|
||||
ss.queuesCache[q.cfg.ID] = q
|
||||
ss.queues = append(ss.queues, q)
|
||||
}
|
||||
|
||||
// remQueue will remove a queue based on it's ID
|
||||
func (ss *StatService) remQueue(qID string) (si *StatsInstance) {
|
||||
si = ss.queuesCache[qID]
|
||||
ss.queues.remWithID(qID)
|
||||
delete(ss.queuesCache, qID)
|
||||
return
|
||||
}
|
||||
|
||||
// store stores the necessary storedMetrics to dataDB
|
||||
func (ss *StatService) storeMetrics() {
|
||||
for _, si := range ss.stInsts {
|
||||
for _, si := range ss.queues {
|
||||
if !si.cfg.Store || !si.dirty { // no need to save
|
||||
continue
|
||||
}
|
||||
@@ -126,7 +150,7 @@ func (ss *StatService) processEvent(ev engine.StatsEvent) (err error) {
|
||||
if evStatsID == "" { // ID is mandatory
|
||||
return errors.New("missing ID field")
|
||||
}
|
||||
for _, stInst := range ss.stInsts {
|
||||
for _, stInst := range ss.queues {
|
||||
if err := stInst.ProcessEvent(ev); err != nil {
|
||||
utils.Logger.Warning(
|
||||
fmt.Sprintf("<StatService> QueueID: %s, ignoring event with ID: %s, error: %s",
|
||||
@@ -143,3 +167,71 @@ func (ss *StatService) V1ProcessEvent(ev engine.StatsEvent, reply *string) (err
|
||||
}
|
||||
return
|
||||
}
|
||||
|
||||
func (ss *StatService) V1GetQueueIDs(ignored struct{}, reply *[]string) (err error) {
|
||||
if len(ss.queuesCache) == 0 {
|
||||
return utils.ErrNotFound
|
||||
}
|
||||
for k := range ss.queuesCache {
|
||||
*reply = append(*reply, k)
|
||||
}
|
||||
return
|
||||
}
|
||||
|
||||
func (ss *StatService) V1GetStatMetrics(queueID string, reply *map[string]string) (err error) {
|
||||
sq, has := ss.queuesCache[queueID]
|
||||
if !has {
|
||||
return utils.ErrNotFound
|
||||
}
|
||||
metrics := make(map[string]string, len(sq.sqMetrics))
|
||||
for metricID, metric := range sq.sqMetrics {
|
||||
metrics[metricID] = metric.GetStringValue("")
|
||||
}
|
||||
*reply = metrics
|
||||
return
|
||||
}
|
||||
|
||||
type ArgsLoadQueues struct {
|
||||
QueueIDs *[]string
|
||||
}
|
||||
|
||||
// V1LoadQueues loads the queues specified by qIDs into the service
|
||||
// loads all if qIDs is nil
|
||||
func (ss *StatService) V1LoadQueues(args ArgsLoadQueues, reply *string) (err error) {
|
||||
qIDs := args.QueueIDs
|
||||
if qIDs == nil {
|
||||
sqPrfxs, err := ss.dataDB.GetKeysForPrefix(utils.StatsQueuePrefix)
|
||||
if err != nil {
|
||||
return err
|
||||
}
|
||||
queueIDs := make([]string, len(sqPrfxs))
|
||||
for i, prfx := range sqPrfxs {
|
||||
queueIDs[i] = prfx[len(utils.StatsQueuePrefix):]
|
||||
}
|
||||
if len(queueIDs) != 0 {
|
||||
qIDs = &queueIDs
|
||||
}
|
||||
}
|
||||
if qIDs == nil || len(*qIDs) == 0 {
|
||||
return utils.ErrNotFound
|
||||
}
|
||||
var sQs []*StatsInstance // cache here so we lock only later when data available
|
||||
for _, qID := range *qIDs {
|
||||
if _, hasPrev := ss.queuesCache[qID]; hasPrev {
|
||||
continue // don't overwrite previous, could be extended in the future by carefully checking cached events
|
||||
}
|
||||
if q, err := ss.loadQueue(qID); err != nil {
|
||||
return err
|
||||
} else {
|
||||
sQs = append(sQs, q)
|
||||
}
|
||||
}
|
||||
ss.Lock()
|
||||
for _, q := range sQs {
|
||||
ss.setQueue(q)
|
||||
}
|
||||
ss.queues.Sort()
|
||||
ss.Unlock()
|
||||
*reply = utils.OK
|
||||
return
|
||||
}
|
||||
|
||||
@@ -35,6 +35,18 @@ func (sis StatsInstances) Sort() {
|
||||
sort.Slice(sis, func(i, j int) bool { return sis[i].cfg.Weight > sis[j].cfg.Weight })
|
||||
}
|
||||
|
||||
// remWithID removes the queue with ID from slice
|
||||
func (sis StatsInstances) remWithID(qID string) {
|
||||
for i, q := range sis {
|
||||
if q.cfg.ID == qID {
|
||||
copy(sis[i:], sis[i+1:])
|
||||
sis[len(sis)-1] = nil
|
||||
sis = sis[:len(sis)-1]
|
||||
break // there can be only one item with ID
|
||||
}
|
||||
}
|
||||
}
|
||||
|
||||
// NewStatsInstance instantiates a StatsInstance
|
||||
func NewStatsInstance(sec *StatsEventCache, ms engine.Marshaler,
|
||||
sqCfg *engine.StatsQueue, sqSM *engine.SQStoredMetrics) (si *StatsInstance, err error) {
|
||||
|
||||
Reference in New Issue
Block a user