Conflicts:
	data/tariffplans/tutorial/Stats.csv
	stats/service.go
This commit is contained in:
Edwardro22
2017-08-03 17:00:01 +03:00
5 changed files with 309 additions and 25 deletions

View File

@@ -62,7 +62,23 @@ func (stsv1 *StatSV1) Call(serviceMethod string, args interface{}, reply interfa
return err
}
// GetLimitsForEvent returns ResourceLimits matching a specific event
// ProcessEvent returns processes a new Event
func (stsv1 *StatSV1) ProcessEvent(ev engine.StatsEvent, reply *string) error {
return stsv1.sts.V1ProcessEvent(ev, reply)
}
// GetQueueIDs returns the list of queues IDs in the system
func (stsv1 *StatSV1) GetQueueIDs(ignored struct{}, reply *[]string) (err error) {
return stsv1.sts.V1GetQueueIDs(ignored, reply)
}
// GetStatMetrics returns the metrics for a queueID
func (stsv1 *StatSV1) GetStatMetrics(queueID string, reply *map[string]string) (err error) {
return stsv1.sts.V1GetStatMetrics(queueID, reply)
}
// LoadQueues loads from dataDB into statsService the queueIDs specified
// loads all when qIDs is nil
func (stsv1 *StatSV1) LoadQueues(args stats.ArgsLoadQueues, reply *string) (err error) {
return stsv1.sts.V1LoadQueues(args, reply)
}

157
apier/v1/stats_it_test.go Normal file
View File

@@ -0,0 +1,157 @@
// +build integration
/*
Real-time Online/Offline Charging System (OCS) for Telecom & ISP environments
Copyright (C) ITsysCOM GmbH
This program is free software: you can redistribute it and/or modify
it under the terms of the GNU General Public License as published by
the Free Software Foundation, either version 3 of the License, or
(at your option) any later version.
This program is distributed in the hope that it will be useful,
but WITHOUT ANY WARRANTY; without even the implied warranty of
MERCHANTABILITY or FITNESS FOR A PARTICULAR PURPOSE. See the
GNU General Public License for more details.
You should have received a copy of the GNU General Public License
along with this program. If not, see <http://www.gnu.org/licenses/>
*/
package v1
import (
"net/rpc"
"net/rpc/jsonrpc"
"path"
"reflect"
"testing"
"time"
"github.com/cgrates/cgrates/config"
"github.com/cgrates/cgrates/engine"
"github.com/cgrates/cgrates/utils"
)
var (
stsV1CfgPath string
stsV1Cfg *config.CGRConfig
stsV1Rpc *rpc.Client
)
func TestStatSV1LoadConfig(t *testing.T) {
var err error
stsV1CfgPath = path.Join(*dataDir, "conf", "samples", "stats")
if stsV1Cfg, err = config.NewCGRConfigFromFolder(stsV1CfgPath); err != nil {
t.Error(err)
}
}
func TestStatSV1InitDataDb(t *testing.T) {
if err := engine.InitDataDb(stsV1Cfg); err != nil {
t.Fatal(err)
}
}
func TestStatSV1StartEngine(t *testing.T) {
if _, err := engine.StopStartEngine(stsV1CfgPath, 1000); err != nil {
t.Fatal(err)
}
}
func TestStatSV1RpcConn(t *testing.T) {
var err error
stsV1Rpc, err = jsonrpc.Dial("tcp", stsV1Cfg.RPCJSONListen) // We connect over JSON so we can also troubleshoot if needed
if err != nil {
t.Fatal("Could not connect to rater: ", err.Error())
}
}
func TestStatSV1TPFromFolder(t *testing.T) {
var reply string
time.Sleep(time.Duration(2000) * time.Millisecond)
attrs := &utils.AttrLoadTpFromFolder{FolderPath: path.Join(*dataDir, "tariffplans", "tutorial")}
if err := stsV1Rpc.Call("ApierV1.LoadTariffPlanFromFolder", attrs, &reply); err != nil {
t.Error(err)
}
time.Sleep(time.Duration(1000) * time.Millisecond)
}
func TestStatSV1GetStats(t *testing.T) {
var reply []string
// first attempt should be empty since there is no queue in cache yet
if err := stsV1Rpc.Call("StatSV1.GetQueueIDs", struct{}{}, &reply); err == nil || err.Error() != utils.ErrNotFound.Error() {
t.Error(err)
}
var metrics map[string]string
if err := stsV1Rpc.Call("StatSV1.GetStatMetrics", "Stats1", &metrics); err == nil || err.Error() != utils.ErrNotFound.Error() {
t.Error(err)
}
var replyStr string
if err := stsV1Rpc.Call("StatSV1.LoadQueues", nil, &replyStr); err != nil {
t.Error(err)
} else if replyStr != utils.OK {
t.Errorf("reply received: %s", replyStr)
}
expectedIDs := []string{"Stats1"}
if err := stsV1Rpc.Call("StatSV1.GetQueueIDs", struct{}{}, &reply); err != nil {
t.Error(err)
} else if !reflect.DeepEqual(expectedIDs, reply) {
t.Errorf("expecting: %+v, received reply: %s", expectedIDs, reply)
}
expectedMetrics := map[string]string{
utils.MetaASR: utils.NOT_AVAILABLE,
utils.MetaACD: "",
}
if err := stsV1Rpc.Call("StatSV1.GetStatMetrics", "Stats1", &metrics); err != nil {
t.Error(err)
} else if !reflect.DeepEqual(expectedMetrics, metrics) {
t.Errorf("expecting: %+v, received reply: %s", expectedMetrics, metrics)
}
}
func TestStatSV1ProcessEvent(t *testing.T) {
var reply string
if err := stsV1Rpc.Call("StatSV1.ProcessEvent",
engine.StatsEvent{
utils.ID: "event1",
utils.ANSWER_TIME: time.Date(2014, 7, 14, 14, 25, 0, 0, time.UTC)},
&reply); err != nil {
t.Error(err)
} else if reply != utils.OK {
t.Errorf("received reply: %s", reply)
}
if err := stsV1Rpc.Call("StatSV1.ProcessEvent",
engine.StatsEvent{
utils.ID: "event2",
utils.ANSWER_TIME: time.Date(2014, 7, 14, 14, 25, 0, 0, time.UTC)},
&reply); err != nil {
t.Error(err)
} else if reply != utils.OK {
t.Errorf("received reply: %s", reply)
}
if err := stsV1Rpc.Call("StatSV1.ProcessEvent",
map[string]interface{}{
utils.ID: "event3",
utils.SETUP_TIME: time.Date(2014, 7, 14, 14, 25, 0, 0, time.UTC)},
&reply); err != nil {
t.Error(err)
} else if reply != utils.OK {
t.Errorf("received reply: %s", reply)
}
expectedMetrics := map[string]string{
utils.MetaASR: "66.66667%",
utils.MetaACD: "",
}
var metrics map[string]string
if err := stsV1Rpc.Call("StatSV1.GetStatMetrics", "Stats1", &metrics); err != nil {
t.Error(err)
} else if !reflect.DeepEqual(expectedMetrics, metrics) {
t.Errorf("expecting: %+v, received reply: %s", expectedMetrics, metrics)
}
}
func TestStatSV1StopEngine(t *testing.T) {
if err := engine.KillEngine(100); err != nil {
t.Error(err)
}
}

View File

@@ -54,7 +54,7 @@ func TestTutITInitCfg(t *testing.T) {
config.SetCgrConfig(tutFsLocalCfg)
}
// Remove data in both rating and accounting db
// Remove data in dataDB
func TestTutITResetDataDb(t *testing.T) {
if err := engine.InitDataDb(tutFsLocalCfg); err != nil {
t.Fatal(err)

View File

@@ -40,24 +40,17 @@ func NewStatService(dataDB engine.DataDB, ms engine.Marshaler, storeInterval tim
if err != nil {
return nil, err
}
ss.stInsts = make(StatsInstances, len(sqPrfxs))
for i, prfx := range sqPrfxs {
sq, err := dataDB.GetStatsQueue(prfx[len(utils.StatsQueuePrefix):])
if err != nil {
return nil, err
}
var sqSM *engine.SQStoredMetrics
if sq.Store {
if sqSM, err = dataDB.GetSQStoredMetrics(sq.ID); err != nil && err != utils.ErrNotFound {
return nil, err
}
}
if ss.stInsts[i], err = NewStatsInstance(ss.evCache, ss.ms, sq, sqSM); err != nil {
ss.queuesCache = make(map[string]*StatsInstance)
ss.queues = make(StatsInstances, 0)
for _, prfx := range sqPrfxs {
if q, err := ss.loadQueue(prfx[len(utils.StatsQueuePrefix):]); err != nil {
return nil, err
} else {
ss.setQueue(q)
}
}
ss.stInsts.Sort()
go ss.dumpStoredMetrics()
ss.queues.Sort()
go ss.dumpStoredMetrics() // start dumpStoredMetrics loop
return
}
@@ -68,8 +61,10 @@ type StatService struct {
ms engine.Marshaler
storeInterval time.Duration
stopStoring chan struct{}
evCache *StatsEventCache // so we can pass it to queues
stInsts StatsInstances // ordered list of StatsQueues
evCache *StatsEventCache // so we can pass it to queues
queuesCache map[string]*StatsInstance // unordered db of StatsQueues, used for fast queries
queues StatsInstances // ordered list of StatsQueues
}
// ListenAndServe loops keeps the service alive
@@ -80,7 +75,7 @@ func (ss *StatService) ListenAndServe(exitChan chan bool) error {
}
// Called to shutdown the service
// ToDo: improve with context, ie, following http implementation
// ToDo: improve with context, ie following http implementation
func (ss *StatService) Shutdown() error {
utils.Logger.Info("<StatS> service shutdown initialized")
close(ss.stopStoring)
@@ -89,9 +84,38 @@ func (ss *StatService) Shutdown() error {
return nil
}
// setQueue adds or modifies a queue into cache
// sort will reorder the ss.queues
func (ss *StatService) loadQueue(qID string) (q *StatsInstance, err error) {
sq, err := ss.dataDB.GetStatsQueue(qID, false, utils.NonTransactional)
if err != nil {
return nil, err
}
var sqSM *engine.SQStoredMetrics
if sq.Store {
if sqSM, err = ss.dataDB.GetSQStoredMetrics(sq.ID); err != nil && err != utils.ErrNotFound {
return nil, err
}
}
return NewStatsInstance(ss.evCache, ss.ms, sq, sqSM)
}
func (ss *StatService) setQueue(q *StatsInstance) {
ss.queuesCache[q.cfg.ID] = q
ss.queues = append(ss.queues, q)
}
// remQueue will remove a queue based on it's ID
func (ss *StatService) remQueue(qID string) (si *StatsInstance) {
si = ss.queuesCache[qID]
ss.queues.remWithID(qID)
delete(ss.queuesCache, qID)
return
}
// store stores the necessary storedMetrics to dataDB
func (ss *StatService) storeMetrics() {
for _, si := range ss.stInsts {
for _, si := range ss.queues {
if !si.cfg.Store || !si.dirty { // no need to save
continue
}
@@ -126,7 +150,7 @@ func (ss *StatService) processEvent(ev engine.StatsEvent) (err error) {
if evStatsID == "" { // ID is mandatory
return errors.New("missing ID field")
}
for _, stInst := range ss.stInsts {
for _, stInst := range ss.queues {
if err := stInst.ProcessEvent(ev); err != nil {
utils.Logger.Warning(
fmt.Sprintf("<StatService> QueueID: %s, ignoring event with ID: %s, error: %s",
@@ -143,3 +167,71 @@ func (ss *StatService) V1ProcessEvent(ev engine.StatsEvent, reply *string) (err
}
return
}
func (ss *StatService) V1GetQueueIDs(ignored struct{}, reply *[]string) (err error) {
if len(ss.queuesCache) == 0 {
return utils.ErrNotFound
}
for k := range ss.queuesCache {
*reply = append(*reply, k)
}
return
}
func (ss *StatService) V1GetStatMetrics(queueID string, reply *map[string]string) (err error) {
sq, has := ss.queuesCache[queueID]
if !has {
return utils.ErrNotFound
}
metrics := make(map[string]string, len(sq.sqMetrics))
for metricID, metric := range sq.sqMetrics {
metrics[metricID] = metric.GetStringValue("")
}
*reply = metrics
return
}
type ArgsLoadQueues struct {
QueueIDs *[]string
}
// V1LoadQueues loads the queues specified by qIDs into the service
// loads all if qIDs is nil
func (ss *StatService) V1LoadQueues(args ArgsLoadQueues, reply *string) (err error) {
qIDs := args.QueueIDs
if qIDs == nil {
sqPrfxs, err := ss.dataDB.GetKeysForPrefix(utils.StatsQueuePrefix)
if err != nil {
return err
}
queueIDs := make([]string, len(sqPrfxs))
for i, prfx := range sqPrfxs {
queueIDs[i] = prfx[len(utils.StatsQueuePrefix):]
}
if len(queueIDs) != 0 {
qIDs = &queueIDs
}
}
if qIDs == nil || len(*qIDs) == 0 {
return utils.ErrNotFound
}
var sQs []*StatsInstance // cache here so we lock only later when data available
for _, qID := range *qIDs {
if _, hasPrev := ss.queuesCache[qID]; hasPrev {
continue // don't overwrite previous, could be extended in the future by carefully checking cached events
}
if q, err := ss.loadQueue(qID); err != nil {
return err
} else {
sQs = append(sQs, q)
}
}
ss.Lock()
for _, q := range sQs {
ss.setQueue(q)
}
ss.queues.Sort()
ss.Unlock()
*reply = utils.OK
return
}

View File

@@ -35,18 +35,37 @@ func (sis StatsInstances) Sort() {
sort.Slice(sis, func(i, j int) bool { return sis[i].cfg.Weight > sis[j].cfg.Weight })
}
// remWithID removes the queue with ID from slice
func (sis StatsInstances) remWithID(qID string) {
for i, q := range sis {
if q.cfg.ID == qID {
copy(sis[i:], sis[i+1:])
sis[len(sis)-1] = nil
sis = sis[:len(sis)-1]
break // there can be only one item with ID
}
}
}
// NewStatsInstance instantiates a StatsInstance
func NewStatsInstance(sec *StatsEventCache, ms engine.Marshaler,
sqCfg *engine.StatsQueue, sqSM *engine.SQStoredMetrics) (si *StatsInstance, err error) {
si = &StatsInstance{sec: sec, ms: ms, cfg: sqCfg}
si = &StatsInstance{sec: sec, ms: ms, cfg: sqCfg, sqMetrics: make(map[string]StatsMetric)}
for _, metricID := range sqCfg.Metrics {
if si.sqMetrics[metricID], err = NewStatsMetric(metricID); err != nil {
return
}
}
if sqSM != nil {
for evID, ev := range sqSM.SEvents {
si.sec.Cache(evID, ev, si.cfg.ID)
}
si.sqItems = sqSM.SQItems
for metricID := range si.sqMetrics {
if si.sqMetrics[metricID], err = NewStatsMetric(metricID); err != nil {
return
if _, has := si.sqMetrics[metricID]; !has {
if si.sqMetrics[metricID], err = NewStatsMetric(metricID); err != nil {
return
}
}
if stored, has := sqSM.SQMetrics[metricID]; !has {
continue