Updated caps stats to use cache onEvict function

This commit is contained in:
Trial97
2020-11-25 17:03:34 +02:00
committed by Dan Christian Bogos
parent 0b50c4763a
commit 128e678ef2
27 changed files with 360 additions and 294 deletions

View File

@@ -33,7 +33,7 @@ import (
var Cache *CacheS
func init() {
Cache = NewCacheS(config.CgrConfig(), nil)
Cache = NewCacheS(config.CgrConfig(), nil, nil)
// Threshold
gob.Register(new(Threshold))
gob.Register(new(ThresholdProfile))
@@ -88,13 +88,14 @@ func SetCache(chS *CacheS) {
}
// NewCacheS initializes the Cache service and executes the precaching
func NewCacheS(cfg *config.CGRConfig, dm *DataManager) (c *CacheS) {
func NewCacheS(cfg *config.CGRConfig, dm *DataManager, cpS *CapsStats) (c *CacheS) {
cfg.CacheCfg().AddTmpCaches()
tCache := cfg.CacheCfg().AsTransCacheConfig()
if len(cfg.CacheCfg().ReplicationConns) != 0 {
var reply string
for k, val := range tCache {
if !cfg.CacheCfg().Partitions[k].Replicate {
if !cfg.CacheCfg().Partitions[k].Replicate ||
k == utils.CacheCapsEvents {
continue
}
val.OnEvicted = func(itmID string, value interface{}) {
@@ -109,6 +110,9 @@ func NewCacheS(cfg *config.CGRConfig, dm *DataManager) (c *CacheS) {
}
}
if _, has := tCache[utils.CacheCapsEvents]; has && cpS != nil {
tCache[utils.CacheCapsEvents].OnEvicted = cpS.OnEvict
}
c = &CacheS{
cfg: cfg,
dm: dm,

139
engine/caps.go Normal file
View File

@@ -0,0 +1,139 @@
/*
Real-time Online/Offline Charging System (OCS) for Telecom & ISP environments
Copyright (C) ITsysCOM GmbH
This program is free software: you can redistribute it and/or modify
it under the terms of the GNU General Public License as published by
the Free Software Foundation, either version 3 of the License, or
(at your option) any later version.
This program is distributed in the hope that it will be useful,
but WITHOUT ANY WARRANTY; without even the implied warranty of
MERCHANTABILITY or FITNESS FOR A PARTICULAR PURPOSE. See the
GNU General Public License for more details.
You should have received a copy of the GNU General Public License
along with this program. If not, see <http://www.gnu.org/licenses/>
*/
package engine
import (
"net"
"strconv"
"sync"
"time"
"github.com/cgrates/cgrates/utils"
)
// Caps the structure that allocs requests for API
type Caps struct {
strategy string
aReqs chan struct{}
}
// NewCaps creates a new caps
func NewCaps(reqs int, strategy string) *Caps {
return &Caps{
strategy: strategy,
aReqs: make(chan struct{}, reqs),
}
}
// IsLimited returns true if the limit is not 0
func (cR *Caps) IsLimited() bool {
return cap(cR.aReqs) != 0
}
// Allocated returns the number of requests actively serviced
func (cR *Caps) Allocated() int {
return len(cR.aReqs)
}
// Allocate will reserve a channel for the API call
func (cR *Caps) Allocate() (err error) {
switch cR.strategy {
case utils.MetaBusy:
if len(cR.aReqs) == cap(cR.aReqs) {
return utils.ErrMaxConcurentRPCExceededNoCaps
}
fallthrough
case utils.MetaQueue:
cR.aReqs <- struct{}{}
}
return
}
// Deallocate will free a channel for the API call
func (cR *Caps) Deallocate() {
<-cR.aReqs
return
}
// NewCapsStats returns the stats for the caps
func NewCapsStats(sampleinterval time.Duration, caps *Caps, stopChan chan struct{}) (cs *CapsStats) {
st, _ := NewStatAverage(1, utils.MetaDynReq, nil)
cs = &CapsStats{st: st}
go cs.loop(sampleinterval, stopChan, caps)
return
}
// CapsStats stores the stats for caps
type CapsStats struct {
sync.RWMutex
st StatMetric
peak int
}
// OnEvict the function that should be called on cache eviction
func (cs *CapsStats) OnEvict(itmID string, value interface{}) {
cs.st.RemEvent(itmID)
}
func (cs *CapsStats) loop(intr time.Duration, stopChan chan struct{}, caps *Caps) {
for {
select {
case <-stopChan:
return
case <-time.After(intr):
evID := time.Now().String()
val := caps.Allocated()
cs.addSample(evID, val)
}
}
}
func (cs *CapsStats) addSample(evID string, val int) {
cs.Lock()
Cache.SetWithoutReplicate(utils.CacheCapsEvents, evID, val, nil, true, utils.NonTransactional)
cs.st.AddEvent(evID, floatDP(val))
if val > cs.peak {
cs.peak = val
}
cs.Unlock()
}
// GetPeak returns the maximum allocated caps
func (cs *CapsStats) GetPeak() (peak int) {
cs.RLock()
peak = cs.peak
cs.RUnlock()
return
}
// GetAverage returns the average allocated caps
func (cs *CapsStats) GetAverage(roundingDecimals int) (avg float64) {
cs.RLock()
avg = cs.st.GetFloat64Value(roundingDecimals)
cs.RUnlock()
return
}
// floatDP should be only used by capstats
type floatDP float64
func (f floatDP) String() string { return strconv.FormatFloat(float64(f), 'f', -1, 64) }
func (f floatDP) FieldAsInterface(fldPath []string) (interface{}, error) { return float64(f), nil }
func (f floatDP) FieldAsString(fldPath []string) (string, error) { return f.String(), nil }
func (f floatDP) RemoteHost() net.Addr { return nil }

146
engine/caps_test.go Normal file
View File

@@ -0,0 +1,146 @@
/*
Real-time Online/Offline Charging System (OCS) for Telecom & ISP environments
Copyright (C) ITsysCOM GmbH
This program is free software: you can redistribute it and/or modify
it under the terms of the GNU General Public License as published by
the Free Software Foundation, either version 3 of the License, or
(at your option) any later version.
This program is distributed in the hope that it will be useful,
but WITHOUT ANY WARRANTY; without even the implied warranty of
MERCHANTABILITY or FITNESS FOR A PARTICULAR PURPOSE. See the
GNU General Public License for more details.
You should have received a copy of the GNU General Public License
along with this program. If not, see <http://www.gnu.org/licenses/>
*/
package engine
import (
"reflect"
"runtime"
"testing"
"time"
"github.com/cgrates/cgrates/utils"
)
func TestNewCaps(t *testing.T) {
exp := &Caps{
strategy: utils.MetaBusy,
aReqs: make(chan struct{}, 0),
}
cs := NewCaps(0, utils.MetaBusy)
// only check the strategy
if !reflect.DeepEqual(exp.strategy, cs.strategy) {
t.Errorf("Expected: %v ,received: %v", exp, cs)
}
if cs.IsLimited() {
t.Errorf("Expected to not be limited")
}
if al := cs.Allocated(); al != 0 {
t.Errorf("Expected: %v ,received: %v", 0, al)
}
if err := cs.Allocate(); err != utils.ErrMaxConcurentRPCExceededNoCaps {
t.Errorf("Expected: %v ,received: %v", utils.ErrMaxConcurentRPCExceededNoCaps, err)
}
cs = NewCaps(1, utils.MetaBusy)
if err := cs.Allocate(); err != nil {
t.Error(err)
}
cs.Deallocate()
}
func TestCapsStats(t *testing.T) {
st, err := NewStatAverage(1, utils.MetaDynReq, nil)
if err != nil {
t.Error(err)
}
exp := &CapsStats{st: st}
cr := NewCaps(0, utils.MetaBusy)
exitChan := make(chan struct{}, 1)
close(exitChan)
cs := NewCapsStats(1, cr, exitChan)
if !reflect.DeepEqual(exp, cs) {
t.Errorf("Expected: %v ,received: %v", exp, cs)
}
<-exitChan
exitChan = make(chan struct{}, 1)
go func() {
runtime.Gosched()
time.Sleep(100)
close(exitChan)
}()
cr = NewCaps(10, utils.MetaBusy)
cr.Allocate()
cr.Allocate()
cs.loop(1, exitChan, cr)
if avg := cs.GetAverage(2); avg <= 0 {
t.Errorf("Expected at least an event to be processed: %v", avg)
}
if pk := cs.GetPeak(); pk != 2 {
t.Errorf("Expected the peak to be 2 received: %v", pk)
}
<-exitChan
}
func TestCapsStatsGetAverage(t *testing.T) {
st, err := NewStatAverage(1, utils.MetaDynReq, nil)
if err != nil {
t.Error(err)
}
cs := &CapsStats{st: st}
cs.addSample("1", 10)
expAvg := 10.
if avg := cs.GetAverage(2); avg != expAvg {
t.Errorf("Expected: %v ,received: %v", expAvg, avg)
}
expPk := 10
if pk := cs.GetPeak(); pk != expPk {
t.Errorf("Expected: %v ,received:%v", expPk, pk)
}
cs.addSample("2", 16)
expAvg = 13.
if avg := cs.GetAverage(2); avg != expAvg {
t.Errorf("Expected: %v ,received: %v", expAvg, avg)
}
expPk = 16
if pk := cs.GetPeak(); pk != expPk {
t.Errorf("Expected: %v ,received:%v", expPk, pk)
}
cs.OnEvict("2", nil)
expAvg = 10.
if avg := cs.GetAverage(2); avg != expAvg {
t.Errorf("Expected: %v ,received: %v", expAvg, avg)
}
if pk := cs.GetPeak(); pk != expPk {
t.Errorf("Expected: %v ,received:%v", expPk, pk)
}
}
func TestFloatDP(t *testing.T) {
f := floatDP(10.)
expStr := "10"
if s := f.String(); s != expStr {
t.Errorf("Expected: %v ,received:%v", expStr, s)
}
if s, err := f.FieldAsString(nil); err != nil {
t.Error(err)
} else if s != expStr {
t.Errorf("Expected: %v ,received:%v", expStr, s)
}
if r := f.RemoteHost(); r != nil {
t.Errorf("Expected remote host to be nil received:%v", r)
}
exp := 10.
if s, err := f.FieldAsInterface(nil); err != nil {
t.Error(err)
} else if s != exp {
t.Errorf("Expected: %v ,received:%v", exp, s)
}
}

View File

@@ -105,7 +105,7 @@ func testLoaderITInitDataDB(t *testing.T) {
}
}
cacheChan := make(chan rpcclient.ClientConnector, 1)
cacheChan <- NewCacheS(lCfg, dataDbCsv)
cacheChan <- NewCacheS(lCfg, dataDbCsv, nil)
connMgr = NewConnManager(lCfg, map[string]chan rpcclient.ClientConnector{
utils.ConcatenatedKey(utils.MetaInternal, utils.MetaCaches): cacheChan,
})