diff --git a/data/tutorials/osips_native/opensips/etc/opensips/opensips.cfg b/data/tutorials/osips_native/opensips/etc/opensips/opensips.cfg index e9530654a..5775653ca 100644 --- a/data/tutorials/osips_native/opensips/etc/opensips/opensips.cfg +++ b/data/tutorials/osips_native/opensips/etc/opensips/opensips.cfg @@ -76,7 +76,7 @@ loadmodule "dialog.so" #### CGRateS module loadmodule "cgrates.so" -modparam("cgrates", "cgrates_engine", "127.0.0.1:2012") +modparam("cgrates", "cgrates_engine", "127.0.0.1:2014") #### UDP protocol diff --git a/engine/statsqueue.go b/engine/statsqueue.go index f6c9e1403..86221e7fe 100755 --- a/engine/statsqueue.go +++ b/engine/statsqueue.go @@ -18,6 +18,7 @@ along with this program. If not, see package engine import ( + "errors" "time" "github.com/cgrates/cgrates/utils" @@ -37,16 +38,6 @@ type SQStoredMetrics struct { SQMetrics map[string][]byte } -// StatsEvent is an event received by StatService -type StatsEvent map[string]interface{} - -func (se StatsEvent) ID() (id string) { - if sID, has := se[utils.ID]; has { - id = sID.(string) - } - return -} - // StatsQueue represents the configuration of a StatsInstance in StatS type StatsQueue struct { ID string // QueueID @@ -59,3 +50,29 @@ type StatsQueue struct { Thresholds []string // list of thresholds to be checked after changes Weight float64 } + +// StatsEvent is an event received by StatService +type StatsEvent map[string]interface{} + +func (se StatsEvent) ID() (id string) { + if sID, has := se[utils.ID]; has { + id = sID.(string) + } + return +} + +// AnswerTime returns the AnswerTime of StatsEvent +func (se StatsEvent) AnswerTime(timezone string) (at time.Time, err error) { + atIf, has := se[utils.ANSWER_TIME] + if !has { + return at, utils.ErrNotFound + } + if at, canCast := atIf.(time.Time); canCast { + return at, nil + } + atStr, canCast := atIf.(string) + if !canCast { + return at, errors.New("cannot cast to string") + } + return utils.ParseTimeDetectLayout(atStr, timezone) +} diff --git a/engine/thresholds.go b/engine/thresholds.go new file mode 100644 index 000000000..c5f56adc8 --- /dev/null +++ b/engine/thresholds.go @@ -0,0 +1,34 @@ +/* +Real-time Online/Offline Charging System (OCS) for Telecom & ISP environments +Copyright (C) ITsysCOM GmbH + +This program is free software: you can redistribute it and/or modify +it under the terms of the GNU General Public License as published by +the Free Software Foundation, either version 3 of the License, or +(at your option) any later version. + +This program is distributed in the hope that it will be useful, +but WITHOUT ANY WARRANTY; without even the implied warranty of +MERCHANTABILITY or FITNESS FOR A PARTICULAR PURPOSE. See the +GNU General Public License for more details. + +You should have received a copy of the GNU General Public License +along with this program. If not, see +*/ + +package engine + +type ThresholdCfg struct { + ID string + Filters []*RequestFilter // Filters for the request + ActivationInterval *utils.ActivationInterval // Time when this limit becomes active and expires + ThresholdType string + ThresholdValue float64 // threshold value + Recurrent bool + MinSleep time.Duration + MinItems int // number of items agregated for the threshold to match + Blocker bool // blocker flag to stop processing on filters matched + Stored bool + Weight float64 // Weight to sort the thresholds + ActionIDs []string +} diff --git a/stats/acd.go b/stats/acd.go new file mode 100644 index 000000000..361e56f59 --- /dev/null +++ b/stats/acd.go @@ -0,0 +1,59 @@ +/* +Real-time Online/Offline Charging System (OCS) for Telecom & ISP environments +Copyright (C) ITsysCOM GmbH + +This program is free software: you can redistribute it and/or modify +it under the terms of the GNU General Public License as published by +the Free Software Foundation, either version 3 of the License, or +(at your option) any later version. + +This program is distributed in the hope that it will be useful, +but WITHOUT ANY WARRANTY; without even the implied warranty of +MERCHANTABILITY or FITNESS FOR A PARTICULAR PURPOSE. See the +GNU General Public License for more details. + +You should have received a copy of the GNU General Public License +along with this program. If not, see +*/ + +package stats + +import ( + "time" + + "github.com/cgrates/cgrates/engine" +) + +func NewACD() (StatsMetric, error) { + return new(ACD), nil +} + +// ACD implements AverageCallDuration metric +type ACD struct { + Sum time.Duration + Count int +} + +func (acd *ACD) GetStringValue(fmtOpts string) (val string) { + return +} + +func (acd *ACD) GetValue() (v interface{}) { + return +} + +func (acd *ACD) AddEvent(ev engine.StatsEvent) (err error) { + return +} + +func (acd *ACD) RemEvent(ev engine.StatsEvent) (err error) { + return +} + +func (acd *ACD) GetMarshaled(ms engine.Marshaler) (vals []byte, err error) { + return +} + +func (acd *ACD) SetFromMarshaled(vals []byte, ms engine.Marshaler) (err error) { + return +} diff --git a/stats/asr.go b/stats/asr.go new file mode 100644 index 000000000..224c2609e --- /dev/null +++ b/stats/asr.go @@ -0,0 +1,83 @@ +/* +Real-time Online/Offline Charging System (OCS) for Telecom & ISP environments +Copyright (C) ITsysCOM GmbH + +This program is free software: you can redistribute it and/or modify +it under the terms of the GNU General Public License as published by +the Free Software Foundation, either version 3 of the License, or +(at your option) any later version. + +This program is distributed in the hope that it will be useful, +but WITHOUT ANY WARRANTY; without even the implied warranty of +MERCHANTABILITY or FITNESS FOR A PARTICULAR PURPOSE. See the +GNU General Public License for more details. + +You should have received a copy of the GNU General Public License +along with this program. If not, see +*/ + +package stats + +import ( + "fmt" + + "github.com/cgrates/cgrates/config" + "github.com/cgrates/cgrates/engine" + "github.com/cgrates/cgrates/utils" +) + +func NewASR() (StatsMetric, error) { + return new(ASR), nil +} + +// ASR implements AverageSuccessRatio metric +type ASR struct { + Answered float64 + Count float64 +} + +func (asr *ASR) GetStringValue(fmtOpts string) (valStr string) { + if asr.Count == 0 { + return utils.NOT_AVAILABLE + } + val := asr.GetValue().(float64) + return fmt.Sprintf("%v%%", val) // %v will automatically limit the number of decimals printed +} + +func (asr *ASR) GetValue() (v interface{}) { + if asr.Count == 0 { + return float64(engine.STATS_NA) + } + return utils.Round((asr.Answered / asr.Count * 100), + config.CgrConfig().RoundingDecimals, utils.ROUNDING_MIDDLE) +} + +func (asr *ASR) AddEvent(ev engine.StatsEvent) (err error) { + if at, err := ev.AnswerTime(config.CgrConfig().DefaultTimezone); err != nil && + err != utils.ErrNotFound { + return err + } else if !at.IsZero() { + asr.Answered += 1 + } + asr.Count += 1 + return +} + +func (asr *ASR) RemEvent(ev engine.StatsEvent) (err error) { + if at, err := ev.AnswerTime(config.CgrConfig().DefaultTimezone); err != nil && + err != utils.ErrNotFound { + return err + } else if !at.IsZero() { + asr.Answered -= 1 + } + asr.Count -= 1 + return +} + +func (asr *ASR) GetMarshaled(ms engine.Marshaler) (vals []byte, err error) { + return ms.Marshal(asr) +} + +func (asr *ASR) SetFromMarshaled(vals []byte, ms engine.Marshaler) (err error) { + return ms.Unmarshal(vals, asr) +} diff --git a/stats/asr_test.go b/stats/asr_test.go new file mode 100644 index 000000000..c0e1cb136 --- /dev/null +++ b/stats/asr_test.go @@ -0,0 +1,85 @@ +/* +Real-time Online/Offline Charging System (OCS) for Telecom & ISP environments +Copyright (C) ITsysCOM GmbH + +This program is free software: you can redistribute it and/or modify +it under the terms of the GNU General Public License as published by +the Free Software Foundation, either version 3 of the License, or +(at your option) any later version. + +This program is distributed in the hope that it will be useful, +but WITHOUT ANY WARRANTY; without even the implied warranty of +MERCHANTABILITY or FITNESS FOR A PARTICULAR PURPOSE. See the +GNU General Public License for more details. + +You should have received a copy of the GNU General Public License +along with this program. If not, see +*/ +package stats + +import ( + "testing" + "time" + + "github.com/cgrates/cgrates/engine" + "github.com/cgrates/cgrates/utils" +) + +func TestASRGetStringValue(t *testing.T) { + asr, _ := NewASR() + if strVal := asr.GetStringValue(""); strVal != utils.NOT_AVAILABLE { + t.Errorf("wrong asr value: %s", strVal) + } + ev := engine.StatsEvent{ + "AnswerTime": time.Date(2014, 7, 14, 14, 25, 0, 0, time.UTC)} + asr.AddEvent(ev) + if strVal := asr.GetStringValue(""); strVal != "100%" { + t.Errorf("wrong asr value: %s", strVal) + } + asr.AddEvent(engine.StatsEvent{}) + asr.AddEvent(engine.StatsEvent{}) + if strVal := asr.GetStringValue(""); strVal != "33.33333%" { + t.Errorf("wrong asr value: %s", strVal) + } + asr.RemEvent(engine.StatsEvent{}) + if strVal := asr.GetStringValue(""); strVal != "50%" { + t.Errorf("wrong asr value: %s", strVal) + } + asr.RemEvent(ev) + if strVal := asr.GetStringValue(""); strVal != "0%" { + t.Errorf("wrong asr value: %s", strVal) + } + asr.RemEvent(engine.StatsEvent{}) + if strVal := asr.GetStringValue(""); strVal != utils.NOT_AVAILABLE { + t.Errorf("wrong asr value: %s", strVal) + } + +} + +func TestASRGetValue(t *testing.T) { + asr, _ := NewASR() + ev := engine.StatsEvent{ + "AnswerTime": time.Date(2014, 7, 14, 14, 25, 0, 0, time.UTC), + } + asr.AddEvent(ev) + if v := asr.GetValue(); v != 100.0 { + t.Errorf("wrong asr value: %f", v) + } + asr.AddEvent(engine.StatsEvent{}) + asr.AddEvent(engine.StatsEvent{}) + if v := asr.GetValue(); v != 33.33333 { + t.Errorf("wrong asr value: %f", v) + } + asr.RemEvent(engine.StatsEvent{}) + if v := asr.GetValue(); v != 50.0 { + t.Errorf("wrong asr value: %f", v) + } + asr.RemEvent(ev) + if v := asr.GetValue(); v != 0.0 { + t.Errorf("wrong asr value: %f", v) + } + asr.RemEvent(engine.StatsEvent{}) + if v := asr.GetValue(); v != -1.0 { + t.Errorf("wrong asr value: %f", v) + } +} diff --git a/stats/metrics.go b/stats/metric.go similarity index 55% rename from stats/metrics.go rename to stats/metric.go index 2442fcd09..abd01a31b 100644 --- a/stats/metrics.go +++ b/stats/metric.go @@ -26,10 +26,11 @@ import ( ) // NewStatsMetrics instantiates the StatsMetrics +// cfg serves as general purpose container to pass config options to metric func NewStatsMetric(metricID string) (sm StatsMetric, err error) { metrics := map[string]func() (StatsMetric, error){ - utils.MetaASR: NewStatsASR, - utils.MetaACD: NewStatsACD, + utils.MetaASR: NewASR, + utils.MetaACD: NewACD, } if _, has := metrics[metricID]; !has { return nil, fmt.Errorf("unsupported metric: %s", metricID) @@ -40,65 +41,9 @@ func NewStatsMetric(metricID string) (sm StatsMetric, err error) { // StatsMetric is the interface which a metric should implement type StatsMetric interface { GetStringValue(fmtOpts string) (val string) + GetValue() interface{} AddEvent(ev engine.StatsEvent) error RemEvent(ev engine.StatsEvent) error GetMarshaled(ms engine.Marshaler) (vals []byte, err error) SetFromMarshaled(vals []byte, ms engine.Marshaler) (err error) // mostly used to load from DB } - -func NewStatsASR() (StatsMetric, error) { - return new(StatsASR), nil -} - -// StatsASR implements AverageSuccessRatio metric -type StatsASR struct { - answered int - count int -} - -func (asr *StatsASR) GetStringValue(fmtOpts string) (val string) { - return -} - -func (asr *StatsASR) AddEvent(ev engine.StatsEvent) (err error) { - return -} - -func (asr *StatsASR) RemEvent(ev engine.StatsEvent) (err error) { - return -} - -func (asr *StatsASR) GetMarshaled(ms engine.Marshaler) (vals []byte, err error) { - return -} - -func (asr *StatsASR) SetFromMarshaled(vals []byte, ms engine.Marshaler) (err error) { - return -} - -func NewStatsACD() (StatsMetric, error) { - return new(StatsACD), nil -} - -// StatsACD implements AverageCallDuration metric -type StatsACD struct{} - -func (acd *StatsACD) GetStringValue(fmtOpts string) (val string) { - return -} - -func (acd *StatsACD) AddEvent(ev engine.StatsEvent) (err error) { - return -} - -func (acd *StatsACD) RemEvent(ev engine.StatsEvent) (err error) { - return -} - -func (acd *StatsACD) GetMarshaled(ms engine.Marshaler) (vals []byte, err error) { - return -} - -func (acd *StatsACD) SetFromMarshaled(vals []byte, ms engine.Marshaler) (err error) { - return -} diff --git a/stats/service.go b/stats/service.go index 7142d505a..e48e7509d 100644 --- a/stats/service.go +++ b/stats/service.go @@ -20,24 +20,56 @@ package stats import ( "errors" "fmt" + "math/rand" "sync" + "time" "github.com/cgrates/cgrates/engine" "github.com/cgrates/cgrates/utils" ) +func init() { + rand.Seed(time.Now().UnixNano()) +} + // NewStatService initializes a StatService -func NewStatService(dataDB engine.DataDB) (ss *StatService, err error) { - ss = &StatService{dataDB: dataDB} +func NewStatService(dataDB engine.DataDB, ms engine.Marshaler, storeInterval time.Duration) (ss *StatService, err error) { + ss = &StatService{dataDB: dataDB, ms: ms, storeInterval: storeInterval, + stopStoring: make(chan struct{}), evCache: NewStatsEventCache()} + sqPrfxs, err := dataDB.GetKeysForPrefix(utils.StatsQueuePrefix) + if err != nil { + return nil, err + } + ss.stInsts = make(StatsInstances, len(sqPrfxs)) + for i, prfx := range sqPrfxs { + sq, err := dataDB.GetStatsQueue(prfx[len(utils.StatsQueuePrefix):], false, utils.NonTransactional) + if err != nil { + return nil, err + } + var sqSM *engine.SQStoredMetrics + if sq.Store { + if sqSM, err = dataDB.GetSQStoredMetrics(sq.ID); err != nil && err != utils.ErrNotFound { + return nil, err + } + } + if ss.stInsts[i], err = NewStatsInstance(ss.evCache, ss.ms, sq, sqSM); err != nil { + return nil, err + } + } + ss.stInsts.Sort() + go ss.dumpStoredMetrics() return } // StatService builds stats for events type StatService struct { sync.RWMutex - dataDB engine.DataDB - stQueues StatsInstances // ordered list of StatsQueues - evCache *StatsEventCache // so we can pass it to queues + dataDB engine.DataDB + ms engine.Marshaler + storeInterval time.Duration + stopStoring chan struct{} + evCache *StatsEventCache // so we can pass it to queues + stInsts StatsInstances // ordered list of StatsQueues } // Called to start the service @@ -47,6 +79,8 @@ func (ss *StatService) ListenAndServe() error { // Called to shutdown the service func (ss *StatService) ServiceShutdown() error { + close(ss.stopStoring) + ss.storeMetrics() return nil } @@ -56,16 +90,43 @@ func (ss *StatService) processEvent(ev engine.StatsEvent) (err error) { if evStatsID == "" { // ID is mandatory return errors.New("missing ID field") } - for _, stQ := range ss.stQueues { - if err := stQ.ProcessEvent(ev); err != nil { - utils.Logger.Warning(fmt.Sprintf(" QueueID: %s, ignoring event with ID: %s, error: %s", - stQ.cfg.ID, evStatsID, err.Error())) + for _, stInst := range ss.stInsts { + if err := stInst.ProcessEvent(ev); err != nil { + utils.Logger.Warning( + fmt.Sprintf(" QueueID: %s, ignoring event with ID: %s, error: %s", + stInst.cfg.ID, evStatsID, err.Error())) } } return } -// store stores the necessary data to DB -func (ss *StatService) store() (err error) { +// store stores the necessary storedMetrics to dataDB +func (ss *StatService) storeMetrics() { + for _, si := range ss.stInsts { + if !si.cfg.Store || !si.dirty { // no need to save + continue + } + if siSM := si.GetStoredMetrics(); siSM != nil { + if err := ss.dataDB.SetSQStoredMetrics(siSM); err != nil { + utils.Logger.Warning( + fmt.Sprintf(" failed saving StoredMetrics for QueueID: %s, error: %s", + si.cfg.ID, err.Error())) + } + } + // randomize the CPU load and give up thread control + time.Sleep(time.Duration(rand.Intn(1000)) * time.Nanosecond) + } return } + +// dumpStoredMetrics regularly dumps metrics to dataDB +func (ss *StatService) dumpStoredMetrics() { + for { + select { + case <-ss.stopStoring: + return + } + ss.storeMetrics() + time.Sleep(ss.storeInterval) + } +} diff --git a/stats/sinstance.go b/stats/sinstance.go index 94a325c81..55253f1d7 100644 --- a/stats/sinstance.go +++ b/stats/sinstance.go @@ -19,6 +19,7 @@ package stats import ( "fmt" + "sort" "sync" "time" @@ -26,8 +27,37 @@ import ( "github.com/cgrates/cgrates/utils" ) +// StatsInstances is a sortable list of StatsInstance type StatsInstances []*StatsInstance +// Sort is part of sort interface, sort based on Weight +func (sis StatsInstances) Sort() { + sort.Slice(sis, func(i, j int) bool { return sis[i].cfg.Weight > sis[j].cfg.Weight }) +} + +// NewStatsInstance instantiates a StatsInstance +func NewStatsInstance(sec *StatsEventCache, ms engine.Marshaler, + sqCfg *engine.StatsQueue, sqSM *engine.SQStoredMetrics) (si *StatsInstance, err error) { + si = &StatsInstance{sec: sec, ms: ms, cfg: sqCfg} + if sqSM != nil { + for evID, ev := range sqSM.SEvents { + si.sec.Cache(evID, ev, si.cfg.ID) + } + si.sqItems = sqSM.SQItems + for metricID := range si.sqMetrics { + if si.sqMetrics[metricID], err = NewStatsMetric(metricID); err != nil { + return + } + if stored, has := sqSM.SQMetrics[metricID]; !has { + continue + } else if err = si.sqMetrics[metricID].SetFromMarshaled(stored, ms); err != nil { + return + } + } + } + return +} + // StatsInstance represents an individual stats instance type StatsInstance struct { sync.RWMutex @@ -39,30 +69,6 @@ type StatsInstance struct { cfg *engine.StatsQueue } -// Init prepares a StatsInstance for operations -// Should be executed at server start -func (sq *StatsInstance) Init(sec *StatsEventCache, ms engine.Marshaler, sqSM *engine.SQStoredMetrics) (err error) { - sq.sec = sec - if sqSM == nil { - return - } - for evID, ev := range sqSM.SEvents { - sq.sec.Cache(evID, ev, sq.cfg.ID) - } - sq.sqItems = sqSM.SQItems - for metricID := range sq.sqMetrics { - if sq.sqMetrics[metricID], err = NewStatsMetric(metricID); err != nil { - return - } - if stored, has := sqSM.SQMetrics[metricID]; !has { - continue - } else if err = sq.sqMetrics[metricID].SetFromMarshaled(stored, ms); err != nil { - return - } - } - return -} - // GetSQStoredMetrics retrieves the data used for store to DB func (sq *StatsInstance) GetStoredMetrics() (sqSM *engine.SQStoredMetrics) { sq.RLock() diff --git a/stats/sinstance_test.go b/stats/sinstance_test.go new file mode 100644 index 000000000..7735d3643 --- /dev/null +++ b/stats/sinstance_test.go @@ -0,0 +1,44 @@ +/* +Real-time Online/Offline Charging System (OCS) for Telecom & ISP environments +Copyright (C) ITsysCOM GmbH + +This program is free software: you can redistribute it and/or modify +it under the terms of the GNU General Public License as published by +the Free Software Foundation, either version 3 of the License, or +(at your option) any later version. + +This program is distributed in the hope that it will be useful, +but WITHOUT ANY WARRANTY; without even the implied warranty of +MERCHANTABILITY or FITNESS FOR A PARTICULAR PURPOSE. See the +GNU General Public License for more details. + +You should have received a copy of the GNU General Public License +along with this program. If not, see +*/ +package stats + +import ( + "reflect" + "testing" + + "github.com/cgrates/cgrates/engine" +) + +func TestStatsInstancesSort(t *testing.T) { + sInsts := StatsInstances{ + &StatsInstance{cfg: &engine.StatsQueue{ID: "FIRST", Weight: 30.0}}, + &StatsInstance{cfg: &engine.StatsQueue{ID: "SECOND", Weight: 40.0}}, + &StatsInstance{cfg: &engine.StatsQueue{ID: "THIRD", Weight: 30.0}}, + &StatsInstance{cfg: &engine.StatsQueue{ID: "FOURTH", Weight: 35.0}}, + } + sInsts.Sort() + eSInst := StatsInstances{ + &StatsInstance{cfg: &engine.StatsQueue{ID: "SECOND", Weight: 40.0}}, + &StatsInstance{cfg: &engine.StatsQueue{ID: "FOURTH", Weight: 35.0}}, + &StatsInstance{cfg: &engine.StatsQueue{ID: "FIRST", Weight: 30.0}}, + &StatsInstance{cfg: &engine.StatsQueue{ID: "THIRD", Weight: 30.0}}, + } + if !reflect.DeepEqual(eSInst, sInsts) { + t.Errorf("expecting: %+v, received: %+v", eSInst, sInsts) + } +}