diff --git a/engine/libstats_test.go b/engine/libstats_test.go new file mode 100644 index 000000000..0e3230d7b --- /dev/null +++ b/engine/libstats_test.go @@ -0,0 +1,42 @@ +/* +Real-time Online/Offline Charging System (OCS) for Telecom & ISP environments +Copyright (C) ITsysCOM GmbH + +This program is free software: you can redistribute it and/or modify +it under the terms of the GNU General Public License as published by +the Free Software Foundation, either version 3 of the License, or +(at your option) any later version. + +This program is distributed in the hope that it will be useful, +but WITHOUT ANY WARRANTY; without even the implied warranty of +MERCHANTABILITY or FITNESS FOR A PARTICULAR PURPOSE. See the +GNU General Public License for more details. + +You should have received a copy of the GNU General Public License +along with this program. If not, see +*/ +package engine + +import ( + "reflect" + "testing" +) + +func TestStatQueuesSort(t *testing.T) { + sInsts := StatQueues{ + &StatQueue{sqPrfl: &StatQueueProfile{ID: "FIRST", Weight: 30.0}}, + &StatQueue{sqPrfl: &StatQueueProfile{ID: "SECOND", Weight: 40.0}}, + &StatQueue{sqPrfl: &StatQueueProfile{ID: "THIRD", Weight: 30.0}}, + &StatQueue{sqPrfl: &StatQueueProfile{ID: "FOURTH", Weight: 35.0}}, + } + sInsts.Sort() + eSInst := StatQueues{ + &StatQueue{sqPrfl: &StatQueueProfile{ID: "SECOND", Weight: 40.0}}, + &StatQueue{sqPrfl: &StatQueueProfile{ID: "FOURTH", Weight: 35.0}}, + &StatQueue{sqPrfl: &StatQueueProfile{ID: "FIRST", Weight: 30.0}}, + &StatQueue{sqPrfl: &StatQueueProfile{ID: "THIRD", Weight: 30.0}}, + } + if !reflect.DeepEqual(eSInst, sInsts) { + t.Errorf("expecting: %+v, received: %+v", eSInst, sInsts) + } +} diff --git a/engine/statmetrics.go b/engine/statmetrics.go new file mode 100644 index 000000000..71cca8fdc --- /dev/null +++ b/engine/statmetrics.go @@ -0,0 +1,149 @@ +/* +Real-time Online/Offline Charging System (OCS) for Telecom & ISP environments +Copyright (C) ITsysCOM GmbH + +This program is free software: you can redistribute it and/or modify +it under the terms of the GNU General Public License as published by +the Free Software Foundation, either version 3 of the License, or +(at your option) any later version. + +This program is distributed in the hope that it will be useful, +but WITHOUT ANY WARRANTY; without even the implied warranty of +MERCHANTABILITY or FITNESS FOR A PARTICULAR PURPOSE. See the +GNU General Public License for more details. + +You should have received a copy of the GNU General Public License +along with this program. If not, see +*/ + +package engine + +import ( + "fmt" + "time" + + "github.com/cgrates/cgrates/config" + "github.com/cgrates/cgrates/utils" +) + +// NewStatsMetrics instantiates the StatsMetrics +// cfg serves as general purpose container to pass config options to metric +func NewStatsMetric(metricID string) (sm StatsMetric, err error) { + metrics := map[string]func() (StatsMetric, error){ + utils.MetaASR: NewASR, + utils.MetaACD: NewACD, + } + if _, has := metrics[metricID]; !has { + return nil, fmt.Errorf("unsupported metric: %s", metricID) + } + return metrics[metricID]() +} + +// StatsMetric is the interface which a metric should implement +type StatsMetric interface { + GetValue() interface{} + GetStringValue(fmtOpts string) (val string) + GetFloat64Value() (val float64) + AddEvent(ev StatsEvent) error + RemEvent(ev StatsEvent) error + GetMarshaled(ms Marshaler) (vals []byte, err error) + SetFromMarshaled(vals []byte, ms Marshaler) (err error) // mostly used to load from DB +} + +func NewASR() (StatsMetric, error) { + return new(ASRStat), nil +} + +// ASR implements AverageSuccessRatio metric +type ASRStat struct { + Answered float64 + Count float64 +} + +func (asr *ASRStat) GetValue() (v interface{}) { + if asr.Count == 0 { + return float64(STATS_NA) + } + return utils.Round((asr.Answered / asr.Count * 100), + config.CgrConfig().RoundingDecimals, utils.ROUNDING_MIDDLE) +} + +func (asr *ASRStat) GetStringValue(fmtOpts string) (valStr string) { + if asr.Count == 0 { + return utils.NOT_AVAILABLE + } + val := asr.GetValue().(float64) + return fmt.Sprintf("%v%%", val) // %v will automatically limit the number of decimals printed +} + +func (asr *ASRStat) GetFloat64Value() (val float64) { + return asr.GetValue().(float64) +} + +func (asr *ASRStat) AddEvent(ev StatsEvent) (err error) { + if at, err := ev.AnswerTime(config.CgrConfig().DefaultTimezone); err != nil && + err != utils.ErrNotFound { + return err + } else if !at.IsZero() { + asr.Answered += 1 + } + asr.Count += 1 + return +} + +func (asr *ASRStat) RemEvent(ev StatsEvent) (err error) { + if at, err := ev.AnswerTime(config.CgrConfig().DefaultTimezone); err != nil && + err != utils.ErrNotFound { + return err + } else if !at.IsZero() { + asr.Answered -= 1 + } + asr.Count -= 1 + return +} + +func (asr *ASRStat) GetMarshaled(ms Marshaler) (vals []byte, err error) { + return ms.Marshal(asr) +} + +func (asr *ASRStat) SetFromMarshaled(vals []byte, ms Marshaler) (err error) { + return ms.Unmarshal(vals, asr) +} + +func NewACD() (StatsMetric, error) { + return new(ACDStat), nil +} + +// ACD implements AverageCallDuration metric +type ACDStat struct { + Sum time.Duration + Count int +} + +func (acd *ACDStat) GetStringValue(fmtOpts string) (val string) { + return +} + +func (acd *ACDStat) GetValue() (v interface{}) { + return +} + +func (acd *ACDStat) GetFloat64Value() (v float64) { + return float64(STATS_NA) +} + +func (acd *ACDStat) AddEvent(ev StatsEvent) (err error) { + return +} + +func (acd *ACDStat) RemEvent(ev StatsEvent) (err error) { + return +} + +func (acd *ACDStat) GetMarshaled(ms Marshaler) (vals []byte, err error) { + return +} + +func (acd *ACDStat) SetFromMarshaled(vals []byte, ms Marshaler) (err error) { + return +} diff --git a/engine/statmetrics_test.go b/engine/statmetrics_test.go new file mode 100644 index 000000000..87beed89d --- /dev/null +++ b/engine/statmetrics_test.go @@ -0,0 +1,86 @@ +/* +Real-time Online/Offline Charging System (OCS) for Telecom & ISP environments +Copyright (C) ITsysCOM GmbH + +This program is free software: you can redistribute it and/or modify +it under the terms of the GNU General Public License as published by +the Free Software Foundation, either version 3 of the License, or +(at your option) any later version. + +This program is distributed in the hope that it will be useful, +but WITHOUT ANY WARRANTY; without even the implied warranty of +MERCHANTABILITY or FITNESS FOR A PARTICULAR PURPOSE. See the +GNU General Public License for more details. + +You should have received a copy of the GNU General Public License +along with this program. If not, see +*/ +package engine + +/* +import ( + "testing" + "time" + + "github.com/cgrates/cgrates/utils" +) + +func TestASRGetStringValue(t *testing.T) { + asr, _ := NewASR() + if strVal := asr.GetStringValue(""); strVal != utils.NOT_AVAILABLE { + t.Errorf("wrong asr value: %s", strVal) + } + ev := engine.StatsEvent{ + "AnswerTime": time.Date(2014, 7, 14, 14, 25, 0, 0, time.UTC)} + asr.AddEvent(ev) + if strVal := asr.GetStringValue(""); strVal != "100%" { + t.Errorf("wrong asr value: %s", strVal) + } + asr.AddEvent(engine.StatsEvent{}) + asr.AddEvent(engine.StatsEvent{}) + if strVal := asr.GetStringValue(""); strVal != "33.33333%" { + t.Errorf("wrong asr value: %s", strVal) + } + asr.RemEvent(engine.StatsEvent{}) + if strVal := asr.GetStringValue(""); strVal != "50%" { + t.Errorf("wrong asr value: %s", strVal) + } + asr.RemEvent(ev) + if strVal := asr.GetStringValue(""); strVal != "0%" { + t.Errorf("wrong asr value: %s", strVal) + } + asr.RemEvent(engine.StatsEvent{}) + if strVal := asr.GetStringValue(""); strVal != utils.NOT_AVAILABLE { + t.Errorf("wrong asr value: %s", strVal) + } + +} + +func TestASRGetValue(t *testing.T) { + asr, _ := NewASR() + ev := engine.StatsEvent{ + "AnswerTime": time.Date(2014, 7, 14, 14, 25, 0, 0, time.UTC), + } + asr.AddEvent(ev) + if v := asr.GetValue(); v != 100.0 { + t.Errorf("wrong asr value: %f", v) + } + asr.AddEvent(engine.StatsEvent{}) + asr.AddEvent(engine.StatsEvent{}) + if v := asr.GetValue(); v != 33.33333 { + t.Errorf("wrong asr value: %f", v) + } + asr.RemEvent(engine.StatsEvent{}) + if v := asr.GetValue(); v != 50.0 { + t.Errorf("wrong asr value: %f", v) + } + asr.RemEvent(ev) + if v := asr.GetValue(); v != 0.0 { + t.Errorf("wrong asr value: %f", v) + } + asr.RemEvent(engine.StatsEvent{}) + if v := asr.GetValue(); v != -1.0 { + t.Errorf("wrong asr value: %f", v) + } +} +*/ diff --git a/engine/stats.go b/engine/stats.go new file mode 100644 index 000000000..18b775b26 --- /dev/null +++ b/engine/stats.go @@ -0,0 +1,262 @@ +/* +Real-time Online/Offline Charging System (OCS) for Telecom & ISP environments +Copyright (C) ITsysCOM GmbH + +This program is free software: you can redistribute it and/or modify +it under the terms of the GNU General Public License as published by +the Free Software Foundation, either version 3 of the License, or +(at your option) any later version. + +This program is distributed in the hope that it will be useful, +but WITHOUT ANY WARRANTY; without even the implied warranty of +MERCHANTABILITY or FITNESS FOR A PARTICULAR PURPOSE. See the +GNU General Public License for more details. + +You should have received a copy of the GNU General Public License +along with this program. If not, see +*/ + +package engine + +import ( + "math/rand" + "time" +) + +func init() { + rand.Seed(time.Now().UnixNano()) +} + +/* +// NewStatService initializes a StatService +func NewStatService(dataDB DataDB, ms Marshaler, storeInterval time.Duration) (ss *StatService, err error) { + ss = &StatService{dataDB: dataDB, ms: ms, storeInterval: storeInterval, + stopStoring: make(chan struct{})} + sqPrfxs, err := dataDB.GetKeysForPrefix(utils.StatsConfigPrefix) + if err != nil { + return nil, err + } + go ss.dumpStoredMetrics() // start dumpStoredMetrics loop + return +} + +// StatService builds stats for events +type StatService struct { + dataDB DataDB + ms Marshaler + storeInterval time.Duration + stopStoring chan struct{} +} + +// ListenAndServe loops keeps the service alive +func (ss *StatService) ListenAndServe(exitChan chan bool) error { + e := <-exitChan + exitChan <- e // put back for the others listening for shutdown request + return nil +} + +// Called to shutdown the service +// ToDo: improve with context, ie following http implementation +func (ss *StatService) Shutdown() error { + utils.Logger.Info(" service shutdown initialized") + close(ss.stopStoring) + ss.storeMetrics() + utils.Logger.Info(" service shutdown complete") + return nil +} + +// setQueue adds or modifies a queue into cache +// sort will reorder the ss.queues +func (ss *StatService) loadQueue(qID string) (q *StatQueue, err error) { + sq, err := ss.dataDB.GetStatsConfig(qID) + if err != nil { + return nil, err + } + return NewStatQueue(ss.evCache, ss.ms, sq, sqSM) +} + +func (ss *StatService) setQueue(q *StatQueue) { + ss.queuesCache[q.cfg.ID] = q + ss.queues = append(ss.queues, q) +} + +// remQueue will remove a queue based on it's ID +func (ss *StatService) remQueue(qID string) (si *StatQueue) { + si = ss.queuesCache[qID] + ss.queues.remWithID(qID) + delete(ss.queuesCache, qID) + return +} + +// store stores the necessary storedMetrics to dataDB +func (ss *StatService) storeMetrics() { + for _, si := range ss.queues { + if !si.cfg.Store || !si.dirty { // no need to save + continue + } + if siSM := si.GetStoredMetrics(); siSM != nil { + if err := ss.dataDB.SetSQStoredMetrics(siSM); err != nil { + utils.Logger.Warning( + fmt.Sprintf(" failed saving StoredMetrics for QueueID: %s, error: %s", + si.cfg.ID, err.Error())) + } + } + // randomize the CPU load and give up thread control + time.Sleep(time.Duration(rand.Intn(1000)) * time.Nanosecond) + } + return +} + +// dumpStoredMetrics regularly dumps metrics to dataDB +func (ss *StatService) dumpStoredMetrics() { + for { + select { + case <-ss.stopStoring: + return + } + ss.storeMetrics() + time.Sleep(ss.storeInterval) + } +} + +// processEvent processes a StatsEvent through the queues and caches it when needed +func (ss *StatService) processEvent(ev StatsEvent) (err error) { + evStatsID := ev.ID() + if evStatsID == "" { // ID is mandatory + return errors.New("missing ID field") + } + for _, stInst := range ss.queues { + if err := stInst.ProcessEvent(ev); err != nil { + utils.Logger.Warning( + fmt.Sprintf(" QueueID: %s, ignoring event with ID: %s, error: %s", + stInst.cfg.ID, evStatsID, err.Error())) + } + if stInst.cfg.Blocker { + break + } + } + return +} + +// V1ProcessEvent implements StatV1 method for processing an Event +func (ss *StatService) V1ProcessEvent(ev StatsEvent, reply *string) (err error) { + if err = ss.processEvent(ev); err == nil { + *reply = utils.OK + } + return +} + +// V1GetQueueIDs returns list of queue IDs configured in the service +func (ss *StatService) V1GetQueueIDs(ignored struct{}, reply *[]string) (err error) { + if len(ss.queuesCache) == 0 { + return utils.ErrNotFound + } + for k := range ss.queuesCache { + *reply = append(*reply, k) + } + return +} + +// V1GetStringMetrics returns the metrics as string values +func (ss *StatService) V1GetStringMetrics(queueID string, reply *map[string]string) (err error) { + sq, has := ss.queuesCache[queueID] + if !has { + return utils.ErrNotFound + } + metrics := make(map[string]string, len(sq.sqMetrics)) + for metricID, metric := range sq.sqMetrics { + metrics[metricID] = metric.GetStringValue("") + } + *reply = metrics + return +} + +// V1GetFloatMetrics returns the metrics as float64 values +func (ss *StatService) V1GetFloatMetrics(queueID string, reply *map[string]float64) (err error) { + sq, has := ss.queuesCache[queueID] + if !has { + return utils.ErrNotFound + } + metrics := make(map[string]float64, len(sq.sqMetrics)) + for metricID, metric := range sq.sqMetrics { + metrics[metricID] = metric.GetFloat64Value() + } + *reply = metrics + return +} + +// ArgsLoadQueues are the arguments passed to V1LoadQueues +type ArgsLoadQueues struct { + QueueIDs *[]string +} + +// V1LoadQueues loads the queues specified by qIDs into the service +// loads all if args.QueueIDs is nil +func (ss *StatService) V1LoadQueues(args ArgsLoadQueues, reply *string) (err error) { + qIDs := args.QueueIDs + if qIDs == nil { + sqPrfxs, err := ss.dataDB.GetKeysForPrefix(utils.StatsConfigPrefix) + if err != nil { + return err + } + queueIDs := make([]string, len(sqPrfxs)) + for i, prfx := range sqPrfxs { + queueIDs[i] = prfx[len(utils.StatsConfigPrefix):] + } + if len(queueIDs) != 0 { + qIDs = &queueIDs + } + } + if qIDs == nil || len(*qIDs) == 0 { + return utils.ErrNotFound + } + var sQs []*StatQueue // cache here so we lock only later when data available + for _, qID := range *qIDs { + if _, hasPrev := ss.queuesCache[qID]; hasPrev { + continue // don't overwrite previous, could be extended in the future by carefully checking cached events + } + if q, err := ss.loadQueue(qID); err != nil { + utils.Logger.Err(fmt.Sprintf(" failed loading quueue with id: <%s>, err: <%s>", + q.cfg.ID, err.Error())) + continue + } else { + sQs = append(sQs, q) + } + } + ss.Lock() + for _, q := range sQs { + ss.setQueue(q) + } + ss.queues.Sort() + ss.Unlock() + *reply = utils.OK + return +} + +// Call implements rpcclient.RpcClientConnection interface for internal RPC +// here for testing purposes +func (ss *StatService) Call(serviceMethod string, args interface{}, reply interface{}) error { + methodSplit := strings.Split(serviceMethod, ".") + if len(methodSplit) != 2 { + return rpcclient.ErrUnsupporteServiceMethod + } + method := reflect.ValueOf(ss).MethodByName(methodSplit[0][len(methodSplit[0])-2:] + methodSplit[1]) + if !method.IsValid() { + return rpcclient.ErrUnsupporteServiceMethod + } + params := []reflect.Value{reflect.ValueOf(args), reflect.ValueOf(reply)} + ret := method.Call(params) + if len(ret) != 1 { + return utils.ErrServerError + } + if ret[0].Interface() == nil { + return nil + } + err, ok := ret[0].Interface().(error) + if !ok { + return utils.ErrServerError + } + return err +} + +*/ diff --git a/engine/stats_test.go b/engine/stats_test.go new file mode 100644 index 000000000..3cc424c9e --- /dev/null +++ b/engine/stats_test.go @@ -0,0 +1,78 @@ +/* +Real-time Online/Offline Charging System (OCS) for Telecom & ISP environments +Copyright (C) ITsysCOM GmbH + +This program is free software: you can redistribute it and/or modify +it under the terms of the GNU General Public License as published by +the Free Software Foundation, either version 3 of the License, or +(at your option) any later version. + +This program is distributed in the hope that it will be useful, +but WITHOUT ANY WARRANTY; without even the implied warranty of +MERCHANTABILITY or FITNESS FOR A PARTICULAR PURPOSE. See the +GNU General Public License for more details. + +You should have received a copy of the GNU General Public License +along with this program. If not, see +*/ +package engine + +/* +import ( + "testing" + "time" + + "github.com/cgrates/cgrates/config" + "github.com/cgrates/cgrates/utils" +) + +func TestReqFilterPassStatS(t *testing.T) { + if cgrCfg := config.CgrConfig(); cgrCfg == nil { + cgrCfg, _ = config.NewDefaultCGRConfig() + config.SetCgrConfig(cgrCfg) + } + dataStorage, _ := engine.NewMapStorage() + dataStorage.SetStatsConfig( + &engine.StatsConfig{ID: "CDRST1", + Filters: []*engine.RequestFilter{ + &engine.RequestFilter{Type: engine.MetaString, FieldName: "Tenant", + Values: []string{"cgrates.org"}}}, + Metrics: []string{utils.MetaASR}}) + statS, err := NewStatService(dataStorage, dataStorage.Marshaler(), 0) + if err != nil { + t.Fatal(err) + } + var replyStr string + if err := statS.Call("StatSV1.LoadQueues", ArgsLoadQueues{}, + &replyStr); err != nil { + t.Error(err) + } else if replyStr != utils.OK { + t.Errorf("reply received: %s", replyStr) + } + cdr := &engine.CDR{ + Tenant: "cgrates.org", + Category: "call", + AnswerTime: time.Now(), + SetupTime: time.Now(), + Usage: 10 * time.Second, + Cost: 10, + Supplier: "suppl1", + DisconnectCause: "NORMAL_CLEARNING", + } + cdrMp, _ := cdr.AsMapStringIface() + cdrMp[utils.ID] = "event1" + if err := statS.processEvent(cdrMp); err != nil { + t.Error(err) + } + rf, err := engine.NewRequestFilter(engine.MetaStatS, "", + []string{"CDRST1:*min_asr:20"}) + if err != nil { + t.Fatal(err) + } + if passes, err := rf.Pass(cdr, "", statS); err != nil { + t.Error(err) + } else if !passes { + t.Error("Not passing") + } +} +*/