From fa406afc65c3be6a9cc748bb7fef69f877de97df Mon Sep 17 00:00:00 2001 From: DanB Date: Thu, 3 Aug 2017 14:31:39 +0200 Subject: [PATCH 1/3] StatsV1.GetQueueIDs, StatsV1.GetStatMetrics, StatsV1.LoadQueues with tests --- apier/v1/stats.go | 18 +++- apier/v1/stats_it_test.go | 153 ++++++++++++++++++++++++++++++ general_tests/tutorial_it_test.go | 2 +- stats/service.go | 132 ++++++++++++++++++++++---- stats/sinstance.go | 12 +++ 5 files changed, 295 insertions(+), 22 deletions(-) create mode 100644 apier/v1/stats_it_test.go diff --git a/apier/v1/stats.go b/apier/v1/stats.go index 4c5a1437e..f83ba54c0 100644 --- a/apier/v1/stats.go +++ b/apier/v1/stats.go @@ -62,7 +62,23 @@ func (stsv1 *StatSV1) Call(serviceMethod string, args interface{}, reply interfa return err } -// GetLimitsForEvent returns ResourceLimits matching a specific event +// ProcessEvent returns processes a new Event func (stsv1 *StatSV1) ProcessEvent(ev engine.StatsEvent, reply *string) error { return stsv1.sts.V1ProcessEvent(ev, reply) } + +// GetQueueIDs returns the list of queues IDs in the system +func (stsv1 *StatSV1) GetQueueIDs(ignored struct{}, reply *[]string) (err error) { + return stsv1.sts.V1GetQueueIDs(ignored, reply) +} + +// GetStatMetrics returns the metrics for a queueID +func (stsv1 *StatSV1) GetStatMetrics(queueID string, reply *map[string]string) (err error) { + return stsv1.sts.V1GetStatMetrics(queueID, reply) +} + +// LoadQueues loads from dataDB into statsService the queueIDs specified +// loads all when qIDs is nil +func (stsv1 *StatSV1) LoadQueues(args stats.ArgsLoadQueues, reply *string) (err error) { + return stsv1.sts.V1LoadQueues(args, reply) +} diff --git a/apier/v1/stats_it_test.go b/apier/v1/stats_it_test.go new file mode 100644 index 000000000..1e7f62f0f --- /dev/null +++ b/apier/v1/stats_it_test.go @@ -0,0 +1,153 @@ +// +build integration + +/* +Real-time Online/Offline Charging System (OCS) for Telecom & ISP environments +Copyright (C) ITsysCOM GmbH + +This program is free software: you can redistribute it and/or modify +it under the terms of the GNU General Public License as published by +the Free Software Foundation, either version 3 of the License, or +(at your option) any later version. + +This program is distributed in the hope that it will be useful, +but WITHOUT ANY WARRANTY; without even the implied warranty of +MERCHANTABILITY or FITNESS FOR A PARTICULAR PURPOSE. See the +GNU General Public License for more details. + +You should have received a copy of the GNU General Public License +along with this program. If not, see +*/ +package v1 + +import ( + "net/rpc" + "net/rpc/jsonrpc" + "path" + "reflect" + "testing" + "time" + + "github.com/cgrates/cgrates/config" + "github.com/cgrates/cgrates/engine" + "github.com/cgrates/cgrates/utils" +) + +var ( + stsV1CfgPath string + stsV1Cfg *config.CGRConfig + stsV1Rpc *rpc.Client +) + +func TestStatSV1LoadConfig(t *testing.T) { + var err error + stsV1CfgPath = path.Join(*dataDir, "conf", "samples", "stats") + if stsV1Cfg, err = config.NewCGRConfigFromFolder(stsV1CfgPath); err != nil { + t.Error(err) + } +} + +func TestStatSV1InitDataDb(t *testing.T) { + if err := engine.InitDataDb(stsV1Cfg); err != nil { + t.Fatal(err) + } +} + +/* +func TestStatSV1StartEngine(t *testing.T) { + if _, err := engine.StopStartEngine(stsV1CfgPath, 1000); err != nil { + t.Fatal(err) + } +} +*/ + +func TestStatSV1RpcConn(t *testing.T) { + var err error + stsV1Rpc, err = jsonrpc.Dial("tcp", stsV1Cfg.RPCJSONListen) // We connect over JSON so we can also troubleshoot if needed + if err != nil { + t.Fatal("Could not connect to rater: ", err.Error()) + } +} + +func TestStatSV1TPFromFolder(t *testing.T) { + var reply string + time.Sleep(time.Duration(2000) * time.Millisecond) + attrs := &utils.AttrLoadTpFromFolder{FolderPath: path.Join(*dataDir, "tariffplans", "tutorial")} + if err := stsV1Rpc.Call("ApierV1.LoadTariffPlanFromFolder", attrs, &reply); err != nil { + t.Error(err) + } + time.Sleep(time.Duration(1000) * time.Millisecond) +} + +func TestStatSV1GetStats(t *testing.T) { + var reply []string + // first attempt should be empty since there is no queue in cache yet + if err := stsV1Rpc.Call("StatSV1.GetQueueIDs", struct{}{}, &reply); err == nil || err.Error() != utils.ErrNotFound.Error() { + t.Error(err) + } + var metrics map[string]string + if err := stsV1Rpc.Call("StatSV1.GetStatMetrics", "Stats1", &metrics); err == nil || err.Error() != utils.ErrNotFound.Error() { + t.Error(err) + } + var replyStr string + if err := stsV1Rpc.Call("StatSV1.LoadQueues", nil, &replyStr); err != nil { + t.Error(err) + } else if replyStr != utils.OK { + t.Errorf("reply received: %s", replyStr) + } + expectedIDs := []string{"Stats1"} + if err := stsV1Rpc.Call("StatSV1.GetQueueIDs", struct{}{}, &reply); err != nil { + t.Error(err) + } else if !reflect.DeepEqual(expectedIDs, reply) { + t.Errorf("expecting: %+v, received reply: %s", expectedIDs, reply) + } + expectedMetrics := map[string]string{} + if err := stsV1Rpc.Call("StatSV1.GetStatMetrics", "Stats1", &metrics); err != nil { + t.Error(err) + } else if !reflect.DeepEqual(expectedMetrics, metrics) { + t.Errorf("expecting: %+v, received reply: %s", expectedMetrics, metrics) + } +} + +func TestStatSV1ProcessEvent(t *testing.T) { + var reply string + if err := stsV1Rpc.Call("StatSV1.ProcessEvent", + engine.StatsEvent{ + utils.ID: "event1", + utils.ANSWER_TIME: time.Date(2014, 7, 14, 14, 25, 0, 0, time.UTC)}, + &reply); err != nil { + t.Error(err) + } else if reply != utils.OK { + t.Errorf("received reply: %s", reply) + } + if err := stsV1Rpc.Call("StatSV1.ProcessEvent", + engine.StatsEvent{ + utils.ID: "event2", + utils.ANSWER_TIME: time.Date(2014, 7, 14, 14, 25, 0, 0, time.UTC)}, + &reply); err != nil { + t.Error(err) + } else if reply != utils.OK { + t.Errorf("received reply: %s", reply) + } + if err := stsV1Rpc.Call("StatSV1.ProcessEvent", + map[string]interface{}{ + utils.ID: "event3", + utils.SETUP_TIME: time.Date(2014, 7, 14, 14, 25, 0, 0, time.UTC)}, + &reply); err != nil { + t.Error(err) + } else if reply != utils.OK { + t.Errorf("received reply: %s", reply) + } + expectedMetrics := map[string]string{} + var metrics map[string]string + if err := stsV1Rpc.Call("StatSV1.GetStatMetrics", "Stats1", &metrics); err != nil { + t.Error(err) + } else if !reflect.DeepEqual(expectedMetrics, metrics) { + t.Errorf("expecting: %+v, received reply: %s", expectedMetrics, metrics) + } +} + +func TestStatSV1StopEngine(t *testing.T) { + if err := engine.KillEngine(100); err != nil { + t.Error(err) + } +} diff --git a/general_tests/tutorial_it_test.go b/general_tests/tutorial_it_test.go index b9e866877..eda77255f 100644 --- a/general_tests/tutorial_it_test.go +++ b/general_tests/tutorial_it_test.go @@ -54,7 +54,7 @@ func TestTutITInitCfg(t *testing.T) { config.SetCgrConfig(tutFsLocalCfg) } -// Remove data in both rating and accounting db +// Remove data in dataDB func TestTutITResetDataDb(t *testing.T) { if err := engine.InitDataDb(tutFsLocalCfg); err != nil { t.Fatal(err) diff --git a/stats/service.go b/stats/service.go index 9eebe388d..b78777527 100644 --- a/stats/service.go +++ b/stats/service.go @@ -40,24 +40,17 @@ func NewStatService(dataDB engine.DataDB, ms engine.Marshaler, storeInterval tim if err != nil { return nil, err } - ss.stInsts = make(StatsInstances, len(sqPrfxs)) - for i, prfx := range sqPrfxs { - sq, err := dataDB.GetStatsQueue(prfx[len(utils.StatsQueuePrefix):], false, utils.NonTransactional) - if err != nil { - return nil, err - } - var sqSM *engine.SQStoredMetrics - if sq.Store { - if sqSM, err = dataDB.GetSQStoredMetrics(sq.ID); err != nil && err != utils.ErrNotFound { - return nil, err - } - } - if ss.stInsts[i], err = NewStatsInstance(ss.evCache, ss.ms, sq, sqSM); err != nil { + ss.queuesCache = make(map[string]*StatsInstance) + ss.queues = make(StatsInstances, 0) + for _, prfx := range sqPrfxs { + if q, err := ss.loadQueue(prfx[len(utils.StatsQueuePrefix):]); err != nil { return nil, err + } else { + ss.setQueue(q) } } - ss.stInsts.Sort() - go ss.dumpStoredMetrics() + ss.queues.Sort() + go ss.dumpStoredMetrics() // start dumpStoredMetrics loop return } @@ -68,8 +61,10 @@ type StatService struct { ms engine.Marshaler storeInterval time.Duration stopStoring chan struct{} - evCache *StatsEventCache // so we can pass it to queues - stInsts StatsInstances // ordered list of StatsQueues + evCache *StatsEventCache // so we can pass it to queues + queuesCache map[string]*StatsInstance // unordered db of StatsQueues, used for fast queries + queues StatsInstances // ordered list of StatsQueues + } // ListenAndServe loops keeps the service alive @@ -80,7 +75,7 @@ func (ss *StatService) ListenAndServe(exitChan chan bool) error { } // Called to shutdown the service -// ToDo: improve with context, ie, following http implementation +// ToDo: improve with context, ie following http implementation func (ss *StatService) Shutdown() error { utils.Logger.Info(" service shutdown initialized") close(ss.stopStoring) @@ -89,9 +84,38 @@ func (ss *StatService) Shutdown() error { return nil } +// setQueue adds or modifies a queue into cache +// sort will reorder the ss.queues +func (ss *StatService) loadQueue(qID string) (q *StatsInstance, err error) { + sq, err := ss.dataDB.GetStatsQueue(qID, false, utils.NonTransactional) + if err != nil { + return nil, err + } + var sqSM *engine.SQStoredMetrics + if sq.Store { + if sqSM, err = ss.dataDB.GetSQStoredMetrics(sq.ID); err != nil && err != utils.ErrNotFound { + return nil, err + } + } + return NewStatsInstance(ss.evCache, ss.ms, sq, sqSM) +} + +func (ss *StatService) setQueue(q *StatsInstance) { + ss.queuesCache[q.cfg.ID] = q + ss.queues = append(ss.queues, q) +} + +// remQueue will remove a queue based on it's ID +func (ss *StatService) remQueue(qID string) (si *StatsInstance) { + si = ss.queuesCache[qID] + ss.queues.remWithID(qID) + delete(ss.queuesCache, qID) + return +} + // store stores the necessary storedMetrics to dataDB func (ss *StatService) storeMetrics() { - for _, si := range ss.stInsts { + for _, si := range ss.queues { if !si.cfg.Store || !si.dirty { // no need to save continue } @@ -126,7 +150,7 @@ func (ss *StatService) processEvent(ev engine.StatsEvent) (err error) { if evStatsID == "" { // ID is mandatory return errors.New("missing ID field") } - for _, stInst := range ss.stInsts { + for _, stInst := range ss.queues { if err := stInst.ProcessEvent(ev); err != nil { utils.Logger.Warning( fmt.Sprintf(" QueueID: %s, ignoring event with ID: %s, error: %s", @@ -143,3 +167,71 @@ func (ss *StatService) V1ProcessEvent(ev engine.StatsEvent, reply *string) (err } return } + +func (ss *StatService) V1GetQueueIDs(ignored struct{}, reply *[]string) (err error) { + if len(ss.queuesCache) == 0 { + return utils.ErrNotFound + } + for k := range ss.queuesCache { + *reply = append(*reply, k) + } + return +} + +func (ss *StatService) V1GetStatMetrics(queueID string, reply *map[string]string) (err error) { + sq, has := ss.queuesCache[queueID] + if !has { + return utils.ErrNotFound + } + metrics := make(map[string]string, len(sq.sqMetrics)) + for metricID, metric := range sq.sqMetrics { + metrics[metricID] = metric.GetStringValue("") + } + *reply = metrics + return +} + +type ArgsLoadQueues struct { + QueueIDs *[]string +} + +// V1LoadQueues loads the queues specified by qIDs into the service +// loads all if qIDs is nil +func (ss *StatService) V1LoadQueues(args ArgsLoadQueues, reply *string) (err error) { + qIDs := args.QueueIDs + if qIDs == nil { + sqPrfxs, err := ss.dataDB.GetKeysForPrefix(utils.StatsQueuePrefix) + if err != nil { + return err + } + queueIDs := make([]string, len(sqPrfxs)) + for i, prfx := range sqPrfxs { + queueIDs[i] = prfx[len(utils.StatsQueuePrefix):] + } + if len(queueIDs) != 0 { + qIDs = &queueIDs + } + } + if qIDs == nil || len(*qIDs) == 0 { + return utils.ErrNotFound + } + var sQs []*StatsInstance // cache here so we lock only later when data available + for _, qID := range *qIDs { + if _, hasPrev := ss.queuesCache[qID]; hasPrev { + continue // don't overwrite previous, could be extended in the future by carefully checking cached events + } + if q, err := ss.loadQueue(qID); err != nil { + return err + } else { + sQs = append(sQs, q) + } + } + ss.Lock() + for _, q := range sQs { + ss.setQueue(q) + } + ss.queues.Sort() + ss.Unlock() + *reply = utils.OK + return +} diff --git a/stats/sinstance.go b/stats/sinstance.go index 55253f1d7..31cb5104e 100644 --- a/stats/sinstance.go +++ b/stats/sinstance.go @@ -35,6 +35,18 @@ func (sis StatsInstances) Sort() { sort.Slice(sis, func(i, j int) bool { return sis[i].cfg.Weight > sis[j].cfg.Weight }) } +// remWithID removes the queue with ID from slice +func (sis StatsInstances) remWithID(qID string) { + for i, q := range sis { + if q.cfg.ID == qID { + copy(sis[i:], sis[i+1:]) + sis[len(sis)-1] = nil + sis = sis[:len(sis)-1] + break // there can be only one item with ID + } + } +} + // NewStatsInstance instantiates a StatsInstance func NewStatsInstance(sec *StatsEventCache, ms engine.Marshaler, sqCfg *engine.StatsQueue, sqSM *engine.SQStoredMetrics) (si *StatsInstance, err error) { From 1054c4e88aa7d9d73c5e617d79b8ba945c4ce759 Mon Sep 17 00:00:00 2001 From: DanB Date: Thu, 3 Aug 2017 15:14:16 +0200 Subject: [PATCH 2/3] Enable engine automatic start in stats test --- apier/v1/stats_it_test.go | 2 -- 1 file changed, 2 deletions(-) diff --git a/apier/v1/stats_it_test.go b/apier/v1/stats_it_test.go index 1e7f62f0f..89c579563 100644 --- a/apier/v1/stats_it_test.go +++ b/apier/v1/stats_it_test.go @@ -52,13 +52,11 @@ func TestStatSV1InitDataDb(t *testing.T) { } } -/* func TestStatSV1StartEngine(t *testing.T) { if _, err := engine.StopStartEngine(stsV1CfgPath, 1000); err != nil { t.Fatal(err) } } -*/ func TestStatSV1RpcConn(t *testing.T) { var err error From 7760a269ad7defebe30d644b4df2ab313716f1c4 Mon Sep 17 00:00:00 2001 From: DanB Date: Thu, 3 Aug 2017 15:43:03 +0200 Subject: [PATCH 3/3] StatS - fix metrics initialization --- apier/v1/stats_it_test.go | 10 ++++++++-- data/tariffplans/tutorial/Stats.csv | 2 +- stats/sinstance.go | 13 ++++++++++--- 3 files changed, 19 insertions(+), 6 deletions(-) diff --git a/apier/v1/stats_it_test.go b/apier/v1/stats_it_test.go index 89c579563..19e5a13b5 100644 --- a/apier/v1/stats_it_test.go +++ b/apier/v1/stats_it_test.go @@ -98,7 +98,10 @@ func TestStatSV1GetStats(t *testing.T) { } else if !reflect.DeepEqual(expectedIDs, reply) { t.Errorf("expecting: %+v, received reply: %s", expectedIDs, reply) } - expectedMetrics := map[string]string{} + expectedMetrics := map[string]string{ + utils.MetaASR: utils.NOT_AVAILABLE, + utils.MetaACD: "", + } if err := stsV1Rpc.Call("StatSV1.GetStatMetrics", "Stats1", &metrics); err != nil { t.Error(err) } else if !reflect.DeepEqual(expectedMetrics, metrics) { @@ -135,7 +138,10 @@ func TestStatSV1ProcessEvent(t *testing.T) { } else if reply != utils.OK { t.Errorf("received reply: %s", reply) } - expectedMetrics := map[string]string{} + expectedMetrics := map[string]string{ + utils.MetaASR: "66.66667%", + utils.MetaACD: "", + } var metrics map[string]string if err := stsV1Rpc.Call("StatSV1.GetStatMetrics", "Stats1", &metrics); err != nil { t.Error(err) diff --git a/data/tariffplans/tutorial/Stats.csv b/data/tariffplans/tutorial/Stats.csv index 2013a7bc6..8fc5e3b17 100755 --- a/data/tariffplans/tutorial/Stats.csv +++ b/data/tariffplans/tutorial/Stats.csv @@ -1,2 +1,2 @@ #Id,FilterType,FilterFieldName,FilterFieldValues,ActivationInterval,QueueLength,TTL,Metrics,Blocker,Stored,Weight,Thresholds -Stats1,*string,Account,1001;1002,2014-07-29T15:00:00Z,100,1s,*asr;*acd;*acc,true,true,20,THRESH1;THRESH2 +Stats1,*string,Account,1001;1002,2014-07-29T15:00:00Z,100,1s,*asr;*acd,true,true,20,THRESH1;THRESH2 diff --git a/stats/sinstance.go b/stats/sinstance.go index 31cb5104e..1ee38597c 100644 --- a/stats/sinstance.go +++ b/stats/sinstance.go @@ -50,15 +50,22 @@ func (sis StatsInstances) remWithID(qID string) { // NewStatsInstance instantiates a StatsInstance func NewStatsInstance(sec *StatsEventCache, ms engine.Marshaler, sqCfg *engine.StatsQueue, sqSM *engine.SQStoredMetrics) (si *StatsInstance, err error) { - si = &StatsInstance{sec: sec, ms: ms, cfg: sqCfg} + si = &StatsInstance{sec: sec, ms: ms, cfg: sqCfg, sqMetrics: make(map[string]StatsMetric)} + for _, metricID := range sqCfg.Metrics { + if si.sqMetrics[metricID], err = NewStatsMetric(metricID); err != nil { + return + } + } if sqSM != nil { for evID, ev := range sqSM.SEvents { si.sec.Cache(evID, ev, si.cfg.ID) } si.sqItems = sqSM.SQItems for metricID := range si.sqMetrics { - if si.sqMetrics[metricID], err = NewStatsMetric(metricID); err != nil { - return + if _, has := si.sqMetrics[metricID]; !has { + if si.sqMetrics[metricID], err = NewStatsMetric(metricID); err != nil { + return + } } if stored, has := sqSM.SQMetrics[metricID]; !has { continue