diff --git a/agents/agentreq_test.go b/agents/agentreq_test.go index 66a4f97ce..3d2a7667b 100644 --- a/agents/agentreq_test.go +++ b/agents/agentreq_test.go @@ -1862,7 +1862,7 @@ func TestAgReqSetFieldsInCache(t *testing.T) { data := engine.NewInternalDB(nil, nil, true) dm := engine.NewDataManager(data, config.CgrConfig().CacheCfg(), nil) filterS := engine.NewFilterS(cfg, nil, dm) - engine.NewCacheS(cfg, dm) + engine.NewCacheS(cfg, dm, nil) agReq := NewAgentRequest(nil, nil, nil, nil, nil, nil, "cgrates.org", "", filterS, nil, nil) agReq.CGRRequest.Set(&utils.FullPath{Path: utils.Account, PathItems: utils.PathItems{{Field: utils.Account}}}, utils.NewNMData("1001")) @@ -1905,7 +1905,7 @@ func TestAgReqSetFieldsInCacheWithTimeOut(t *testing.T) { filterS := engine.NewFilterS(cfg, nil, dm) cfg.CacheCfg().Partitions[utils.CacheUCH].TTL = time.Second - engine.Cache = engine.NewCacheS(cfg, dm) + engine.Cache = engine.NewCacheS(cfg, dm, nil) agReq := NewAgentRequest(nil, nil, nil, nil, nil, nil, "cgrates.org", "", filterS, nil, nil) agReq.CGRRequest.Set(&utils.FullPath{Path: utils.Account, PathItems: utils.PathItems{{Field: utils.Account}}}, utils.NewNMData("1001")) diff --git a/apier/v1/sessions.go b/apier/v1/sessions.go index 7fbcd59e3..af8a73fac 100644 --- a/apier/v1/sessions.go +++ b/apier/v1/sessions.go @@ -19,13 +19,13 @@ along with this program. If not, see package v1 import ( - "github.com/cgrates/cgrates/cores" "github.com/cgrates/cgrates/dispatchers" + "github.com/cgrates/cgrates/engine" "github.com/cgrates/cgrates/sessions" "github.com/cgrates/cgrates/utils" ) -func NewSessionSv1(sS *sessions.SessionS, caps *cores.Caps) *SessionSv1 { +func NewSessionSv1(sS *sessions.SessionS, caps *engine.Caps) *SessionSv1 { return &SessionSv1{ sS: sS, caps: caps, @@ -35,7 +35,7 @@ func NewSessionSv1(sS *sessions.SessionS, caps *cores.Caps) *SessionSv1 { // SessionSv1 exports RPC from SessionSv1 type SessionSv1 struct { sS *sessions.SessionS - caps *cores.Caps + caps *engine.Caps } func (ssv1 *SessionSv1) AuthorizeEvent(args *sessions.V1AuthorizeArgs, diff --git a/apier/v1/smg.go b/apier/v1/smg.go index 27d834d68..ac8a46813 100644 --- a/apier/v1/smg.go +++ b/apier/v1/smg.go @@ -19,11 +19,11 @@ along with this program. If not, see package v1 import ( - "github.com/cgrates/cgrates/cores" + "github.com/cgrates/cgrates/engine" "github.com/cgrates/cgrates/sessions" ) -func NewSMGenericV1(sS *sessions.SessionS, caps *cores.Caps) *SMGenericV1 { +func NewSMGenericV1(sS *sessions.SessionS, caps *engine.Caps) *SMGenericV1 { return &SMGenericV1{ Ss: sS, caps: caps, @@ -34,7 +34,7 @@ func NewSMGenericV1(sS *sessions.SessionS, caps *cores.Caps) *SMGenericV1 { // DEPRECATED, use SessionSv1 instead type SMGenericV1 struct { Ss *sessions.SessionS - caps *cores.Caps + caps *engine.Caps } // Returns MaxUsage (for calls in seconds), -1 for no limit diff --git a/cmd/cgr-engine/cgr-engine.go b/cmd/cgr-engine/cgr-engine.go index faab8742e..cae408bcb 100644 --- a/cmd/cgr-engine/cgr-engine.go +++ b/cmd/cgr-engine/cgr-engine.go @@ -76,8 +76,9 @@ func startFilterService(filterSChan chan *engine.FilterS, cacheS *engine.CacheS, // initCacheS inits the CacheS and starts precaching as well as populating internal channel for RPC conns func initCacheS(internalCacheSChan chan rpcclient.ClientConnector, server *cores.Server, dm *engine.DataManager, exitChan chan<- struct{}, - anz *services.AnalyzerService) (chS *engine.CacheS) { - chS = engine.NewCacheS(cfg, dm) + anz *services.AnalyzerService, + cpS *engine.CapsStats) (chS *engine.CacheS) { + chS = engine.NewCacheS(cfg, dm, cpS) go func() { if err := chS.Precache(); err != nil { utils.Logger.Crit(fmt.Sprintf("<%s> could not init, error: %s", utils.CacheS, err.Error())) @@ -439,7 +440,7 @@ func main() { if len(utils.ConcurrentReqsStrategy) != 0 { cncReqsStrategy = utils.ConcurrentReqsStrategy } - caps := cores.NewCaps(cncReqsLimit, cncReqsStrategy) + caps := engine.NewCaps(cncReqsLimit, cncReqsStrategy) utils.Logger.Info(fmt.Sprintf(" starting version <%s><%s>", vers, goVers)) cfg.LazySanityCheck() @@ -541,13 +542,6 @@ func main() { } } - // init CacheS - cacheS := initCacheS(internalCacheSChan, server, dmService.GetDM(), exitChan, anz) - engine.SetCache(cacheS) - - // init GuardianSv1 - initGuardianSv1(internalGuardianSChan, server, anz) - // init CoreSv1 coreS := services.NewCoreService(cfg, caps, server, internalCoreSv1Chan, anz) if err := coreS.Start(); err != nil { @@ -555,6 +549,13 @@ func main() { return } + // init CacheS + cacheS := initCacheS(internalCacheSChan, server, dmService.GetDM(), exitChan, anz, coreS.GetCoreS().CapsStats) + engine.SetCache(cacheS) + + // init GuardianSv1 + initGuardianSv1(internalGuardianSChan, server, anz) + // Start ServiceManager srvManager := servmanager.NewServiceManager(cfg, exitChan) attrS := services.NewAttributeService(cfg, dmService, cacheS, filterSChan, server, internalAttributeSChan, anz) diff --git a/cores/caps.go b/cores/caps.go index f3e56b7e0..9b6df467a 100644 --- a/cores/caps.go +++ b/cores/caps.go @@ -22,59 +22,12 @@ import ( "net" "net/rpc" "net/rpc/jsonrpc" - "strconv" - "sync" - "time" "github.com/cgrates/cgrates/analyzers" "github.com/cgrates/cgrates/engine" "github.com/cgrates/cgrates/utils" ) -// Caps the structure that allocs requests for API -type Caps struct { - strategy string - aReqs chan struct{} -} - -// NewCaps creates a new caps -func NewCaps(reqs int, strategy string) *Caps { - return &Caps{ - strategy: strategy, - aReqs: make(chan struct{}, reqs), - } -} - -// IsLimited returns true if the limit is not 0 -func (cR *Caps) IsLimited() bool { - return cap(cR.aReqs) != 0 -} - -// Allocated returns the number of requests actively serviced -func (cR *Caps) Allocated() int { - return len(cR.aReqs) -} - -// Allocate will reserve a channel for the API call -func (cR *Caps) Allocate() (err error) { - switch cR.strategy { - case utils.MetaBusy: - if len(cR.aReqs) == cap(cR.aReqs) { - return utils.ErrMaxConcurentRPCExceededNoCaps - } - fallthrough - case utils.MetaQueue: - cR.aReqs <- struct{}{} - } - return -} - -// Deallocate will free a channel for the API call -func (cR *Caps) Deallocate() { - <-cR.aReqs - return -} - type conn interface { Read(b []byte) (n int, err error) Write(b []byte) (n int, err error) @@ -83,7 +36,7 @@ type conn interface { RemoteAddr() net.Addr } -func newCapsGOBCodec(conn conn, caps *Caps, anz *analyzers.AnalyzerService) (r rpc.ServerCodec) { +func newCapsGOBCodec(conn conn, caps *engine.Caps, anz *analyzers.AnalyzerService) (r rpc.ServerCodec) { r = newCapsServerCodec(newGobServerCodec(conn), caps) if anz != nil { from := conn.RemoteAddr() @@ -101,7 +54,7 @@ func newCapsGOBCodec(conn conn, caps *Caps, anz *analyzers.AnalyzerService) (r r return } -func newCapsJSONCodec(conn conn, caps *Caps, anz *analyzers.AnalyzerService) (r rpc.ServerCodec) { +func newCapsJSONCodec(conn conn, caps *engine.Caps, anz *analyzers.AnalyzerService) (r rpc.ServerCodec) { r = newCapsServerCodec(jsonrpc.NewServerCodec(conn), caps) if anz != nil { from := conn.RemoteAddr() @@ -119,7 +72,7 @@ func newCapsJSONCodec(conn conn, caps *Caps, anz *analyzers.AnalyzerService) (r return } -func newCapsServerCodec(sc rpc.ServerCodec, caps *Caps) rpc.ServerCodec { +func newCapsServerCodec(sc rpc.ServerCodec, caps *engine.Caps) rpc.ServerCodec { if !caps.IsLimited() { return sc } @@ -131,7 +84,7 @@ func newCapsServerCodec(sc rpc.ServerCodec, caps *Caps) rpc.ServerCodec { type capsServerCodec struct { sc rpc.ServerCodec - caps *Caps + caps *engine.Caps } func (c *capsServerCodec) ReadRequestHeader(r *rpc.Request) error { @@ -153,70 +106,3 @@ func (c *capsServerCodec) WriteResponse(r *rpc.Response, x interface{}) error { return c.sc.WriteResponse(r, x) } func (c *capsServerCodec) Close() error { return c.sc.Close() } - -// NewCapsStats returns the stats for the caps -func NewCapsStats(sampleinterval time.Duration, caps *Caps, stopChan chan struct{}) (cs *CapsStats) { - st, _ := engine.NewStatAverage(1, utils.MetaDynReq, nil) - cs = &CapsStats{st: st} - go cs.loop(sampleinterval, stopChan, caps) - return -} - -// CapsStats stores the stats for caps -type CapsStats struct { - sync.RWMutex - st engine.StatMetric - peak int -} - -// OnEvict the function that should be called on cache eviction -func (cs *CapsStats) OnEvict(itmID string, value interface{}) { - cs.st.RemEvent(itmID) -} - -func (cs *CapsStats) loop(intr time.Duration, stopChan chan struct{}, caps *Caps) { - for { - select { - case <-stopChan: - return - case <-time.After(intr): - evID := time.Now().String() - val := caps.Allocated() - cs.addSample(evID, val) - } - } -} - -func (cs *CapsStats) addSample(evID string, val int) { - cs.Lock() - engine.Cache.SetWithoutReplicate(utils.CacheCapsEvents, evID, val, nil, true, utils.NonTransactional) - cs.st.AddEvent(evID, floatDP(val)) - if val > cs.peak { - cs.peak = val - } - cs.Unlock() -} - -// GetPeak returns the maximum allocated caps -func (cs *CapsStats) GetPeak() (peak int) { - cs.RLock() - peak = cs.peak - cs.RUnlock() - return -} - -// GetAverage returns the average allocated caps -func (cs *CapsStats) GetAverage(roundingDecimals int) (avg float64) { - cs.RLock() - avg = cs.st.GetFloat64Value(roundingDecimals) - cs.RUnlock() - return -} - -// floatDP should be only used by capstats -type floatDP float64 - -func (f floatDP) String() string { return strconv.FormatFloat(float64(f), 'f', -1, 64) } -func (f floatDP) FieldAsInterface(fldPath []string) (interface{}, error) { return float64(f), nil } -func (f floatDP) FieldAsString(fldPath []string) (string, error) { return f.String(), nil } -func (f floatDP) RemoteHost() net.Addr { return nil } diff --git a/cores/caps_test.go b/cores/caps_test.go index c263820d8..958026aaa 100644 --- a/cores/caps_test.go +++ b/cores/caps_test.go @@ -22,133 +22,13 @@ import ( "net/rpc" "net/rpc/jsonrpc" "reflect" - "runtime" "testing" - "time" "github.com/cgrates/cgrates/analyzers" "github.com/cgrates/cgrates/engine" "github.com/cgrates/cgrates/utils" ) -func TestNewCaps(t *testing.T) { - exp := &Caps{ - strategy: utils.MetaBusy, - aReqs: make(chan struct{}, 0), - } - cs := NewCaps(0, utils.MetaBusy) - - // only check the strategy - if !reflect.DeepEqual(exp.strategy, cs.strategy) { - t.Errorf("Expected: %v ,received: %v", exp, cs) - } - - if cs.IsLimited() { - t.Errorf("Expected to not be limited") - } - - if al := cs.Allocated(); al != 0 { - t.Errorf("Expected: %v ,received: %v", 0, al) - } - if err := cs.Allocate(); err != utils.ErrMaxConcurentRPCExceededNoCaps { - t.Errorf("Expected: %v ,received: %v", utils.ErrMaxConcurentRPCExceededNoCaps, err) - } - cs = NewCaps(1, utils.MetaBusy) - if err := cs.Allocate(); err != nil { - t.Error(err) - } - cs.Deallocate() -} - -func TestCapsStats(t *testing.T) { - st, err := engine.NewStatAverage(1, utils.MetaDynReq, nil) - if err != nil { - t.Error(err) - } - exp := &CapsStats{st: st} - cr := NewCaps(0, utils.MetaBusy) - exitChan := make(chan struct{}, 1) - close(exitChan) - cs := NewCapsStats(1, cr, exitChan) - if !reflect.DeepEqual(exp, cs) { - t.Errorf("Expected: %v ,received: %v", exp, cs) - } - <-exitChan - exitChan = make(chan struct{}, 1) - go func() { - runtime.Gosched() - time.Sleep(100) - close(exitChan) - }() - cr = NewCaps(10, utils.MetaBusy) - cr.Allocate() - cr.Allocate() - cs.loop(1, exitChan, cr) - if avg := cs.GetAverage(2); avg <= 0 { - t.Errorf("Expected at least an event to be processed: %v", avg) - } - if pk := cs.GetPeak(); pk != 2 { - t.Errorf("Expected the peak to be 2 received: %v", pk) - } - <-exitChan -} - -func TestCapsStatsGetAverage(t *testing.T) { - st, err := engine.NewStatAverage(1, utils.MetaDynReq, nil) - if err != nil { - t.Error(err) - } - cs := &CapsStats{st: st} - cs.addSample("1", 10) - expAvg := 10. - if avg := cs.GetAverage(2); avg != expAvg { - t.Errorf("Expected: %v ,received: %v", expAvg, avg) - } - expPk := 10 - if pk := cs.GetPeak(); pk != expPk { - t.Errorf("Expected: %v ,received:%v", expPk, pk) - } - cs.addSample("2", 16) - expAvg = 13. - if avg := cs.GetAverage(2); avg != expAvg { - t.Errorf("Expected: %v ,received: %v", expAvg, avg) - } - expPk = 16 - if pk := cs.GetPeak(); pk != expPk { - t.Errorf("Expected: %v ,received:%v", expPk, pk) - } - cs.OnEvict("2", nil) - expAvg = 10. - if avg := cs.GetAverage(2); avg != expAvg { - t.Errorf("Expected: %v ,received: %v", expAvg, avg) - } - if pk := cs.GetPeak(); pk != expPk { - t.Errorf("Expected: %v ,received:%v", expPk, pk) - } -} - -func TestFloatDP(t *testing.T) { - f := floatDP(10.) - expStr := "10" - if s := f.String(); s != expStr { - t.Errorf("Expected: %v ,received:%v", expStr, s) - } - if s, err := f.FieldAsString(nil); err != nil { - t.Error(err) - } else if s != expStr { - t.Errorf("Expected: %v ,received:%v", expStr, s) - } - if r := f.RemoteHost(); r != nil { - t.Errorf("Expected remote host to be nil received:%v", r) - } - exp := 10. - if s, err := f.FieldAsInterface(nil); err != nil { - t.Error(err) - } else if s != exp { - t.Errorf("Expected: %v ,received:%v", exp, s) - } -} - type mockServerCodec struct{} func (c *mockServerCodec) ReadRequestHeader(r *rpc.Request) (err error) { @@ -167,11 +47,11 @@ func (c *mockServerCodec) Close() error { return nil } func TestNewCapsServerCodec(t *testing.T) { mk := new(mockServerCodec) - cr := NewCaps(0, utils.MetaBusy) + cr := engine.NewCaps(0, utils.MetaBusy) if r := newCapsServerCodec(mk, cr); !reflect.DeepEqual(mk, r) { t.Errorf("Expected: %v ,received:%v", mk, r) } - cr = NewCaps(1, utils.MetaBusy) + cr = engine.NewCaps(1, utils.MetaBusy) exp := &capsServerCodec{ sc: mk, caps: cr, @@ -230,7 +110,7 @@ func (*mockConn) RemoteAddr() net.Addr { return utils.LocalAddr() } func TestNewCapsGOBCodec(t *testing.T) { conn := new(mockConn) - cr := NewCaps(0, utils.MetaBusy) + cr := engine.NewCaps(0, utils.MetaBusy) anz := &analyzers.AnalyzerService{} exp := newGobServerCodec(conn) if r := newCapsGOBCodec(conn, cr, nil); !reflect.DeepEqual(r, exp) { @@ -244,7 +124,7 @@ func TestNewCapsGOBCodec(t *testing.T) { func TestNewCapsJSONCodec(t *testing.T) { conn := new(mockConn) - cr := NewCaps(0, utils.MetaBusy) + cr := engine.NewCaps(0, utils.MetaBusy) anz := &analyzers.AnalyzerService{} exp := jsonrpc.NewServerCodec(conn) if r := newCapsJSONCodec(conn, cr, nil); !reflect.DeepEqual(r, exp) { diff --git a/cores/core.go b/cores/core.go index 466b72b65..3930fdd53 100644 --- a/cores/core.go +++ b/cores/core.go @@ -23,23 +23,24 @@ import ( "runtime" "github.com/cgrates/cgrates/config" + "github.com/cgrates/cgrates/engine" "github.com/cgrates/cgrates/utils" ) -func NewCoreService(cfg *config.CGRConfig, caps *Caps, stopChan chan struct{}) *CoreService { - var st *CapsStats +func NewCoreService(cfg *config.CGRConfig, caps *engine.Caps, stopChan chan struct{}) *CoreService { + var st *engine.CapsStats if caps.IsLimited() && cfg.CoreSCfg().CapsStatsInterval != 0 { - st = NewCapsStats(cfg.CoreSCfg().CapsStatsInterval, caps, stopChan) + st = engine.NewCapsStats(cfg.CoreSCfg().CapsStatsInterval, caps, stopChan) } return &CoreService{ cfg: cfg, - capsStats: st, + CapsStats: st, } } type CoreService struct { cfg *config.CGRConfig - capsStats *CapsStats + CapsStats *engine.CapsStats } // Shutdown is called to shutdown the service @@ -49,6 +50,7 @@ func (cS *CoreService) Shutdown() { return } +// Status returns the status of the engine func (cS *CoreService) Status(arg *utils.TenantWithOpts, reply *map[string]interface{}) (err error) { memstats := new(runtime.MemStats) runtime.ReadMemStats(memstats) diff --git a/cores/server.go b/cores/server.go index 0fc7234c9..a2bb0b283 100644 --- a/cores/server.go +++ b/cores/server.go @@ -39,6 +39,7 @@ import ( rpc2_jsonrpc "github.com/cenkalti/rpc2/jsonrpc" "github.com/cgrates/cgrates/analyzers" + "github.com/cgrates/cgrates/engine" "github.com/cgrates/cgrates/utils" "github.com/cenkalti/rpc2" @@ -53,7 +54,7 @@ func init() { gob.Register(url.Values{}) } -func NewServer(caps *Caps) (s *Server) { +func NewServer(caps *engine.Caps) (s *Server) { return &Server{ httpMux: http.NewServeMux(), httpsMux: http.NewServeMux(), @@ -70,7 +71,7 @@ type Server struct { stopbiRPCServer chan struct{} // used in order to fully stop the biRPC httpsMux *http.ServeMux httpMux *http.ServeMux - caps *Caps + caps *engine.Caps anz *analyzers.AnalyzerService } @@ -347,12 +348,12 @@ type rpcRequest struct { r io.ReadCloser // holds the JSON formated RPC request rw io.ReadWriter // holds the JSON formated RPC response remoteAddr net.Addr - caps *Caps + caps *engine.Caps anzWarpper *analyzers.AnalyzerService } // newRPCRequest returns a new rpcRequest. -func newRPCRequest(r io.ReadCloser, remoteAddr net.Addr, caps *Caps, anz *analyzers.AnalyzerService) *rpcRequest { +func newRPCRequest(r io.ReadCloser, remoteAddr net.Addr, caps *engine.Caps, anz *analyzers.AnalyzerService) *rpcRequest { return &rpcRequest{ r: r, rw: new(bytes.Buffer), diff --git a/dispatcherh/libdispatcherh_test.go b/dispatcherh/libdispatcherh_test.go index 812de1d7a..d20e76cb6 100644 --- a/dispatcherh/libdispatcherh_test.go +++ b/dispatcherh/libdispatcherh_test.go @@ -162,7 +162,7 @@ func TestRegister(t *testing.T) { t.Fatal(err) } req.RemoteAddr = "127.0.0.1:2356" - engine.SetCache(engine.NewCacheS(config.CgrConfig(), nil)) + engine.SetCache(engine.NewCacheS(config.CgrConfig(), nil, nil)) if rplyID, err := register(req); err != nil { t.Fatal(err) } else if !reflect.DeepEqual(id, *rplyID) { @@ -253,7 +253,7 @@ func TestRegister(t *testing.T) { }, } errCfg.CacheCfg().ReplicationConns = []string{"errCon"} - engine.SetCache(engine.NewCacheS(errCfg, nil)) + engine.SetCache(engine.NewCacheS(errCfg, nil, nil)) req.Body = ioutil.NopCloser(bytes.NewBuffer(uargsJSON)) if _, err := register(req); err != utils.ErrPartiallyExecuted { t.Errorf("Expected error: %s ,received: %v", utils.ErrPartiallyExecuted, err) @@ -297,7 +297,7 @@ func TestRegister(t *testing.T) { t.Errorf("Expected error,received: nil") } - engine.SetCache(engine.NewCacheS(config.CgrConfig(), nil)) + engine.SetCache(engine.NewCacheS(config.CgrConfig(), nil, nil)) } type errRecorder struct{} diff --git a/engine/caches.go b/engine/caches.go index f279aec1f..2a4a1dd3a 100644 --- a/engine/caches.go +++ b/engine/caches.go @@ -33,7 +33,7 @@ import ( var Cache *CacheS func init() { - Cache = NewCacheS(config.CgrConfig(), nil) + Cache = NewCacheS(config.CgrConfig(), nil, nil) // Threshold gob.Register(new(Threshold)) gob.Register(new(ThresholdProfile)) @@ -88,13 +88,14 @@ func SetCache(chS *CacheS) { } // NewCacheS initializes the Cache service and executes the precaching -func NewCacheS(cfg *config.CGRConfig, dm *DataManager) (c *CacheS) { +func NewCacheS(cfg *config.CGRConfig, dm *DataManager, cpS *CapsStats) (c *CacheS) { cfg.CacheCfg().AddTmpCaches() tCache := cfg.CacheCfg().AsTransCacheConfig() if len(cfg.CacheCfg().ReplicationConns) != 0 { var reply string for k, val := range tCache { - if !cfg.CacheCfg().Partitions[k].Replicate { + if !cfg.CacheCfg().Partitions[k].Replicate || + k == utils.CacheCapsEvents { continue } val.OnEvicted = func(itmID string, value interface{}) { @@ -109,6 +110,9 @@ func NewCacheS(cfg *config.CGRConfig, dm *DataManager) (c *CacheS) { } } + if _, has := tCache[utils.CacheCapsEvents]; has && cpS != nil { + tCache[utils.CacheCapsEvents].OnEvicted = cpS.OnEvict + } c = &CacheS{ cfg: cfg, dm: dm, diff --git a/engine/caps.go b/engine/caps.go new file mode 100644 index 000000000..eb184b004 --- /dev/null +++ b/engine/caps.go @@ -0,0 +1,139 @@ +/* +Real-time Online/Offline Charging System (OCS) for Telecom & ISP environments +Copyright (C) ITsysCOM GmbH + +This program is free software: you can redistribute it and/or modify +it under the terms of the GNU General Public License as published by +the Free Software Foundation, either version 3 of the License, or +(at your option) any later version. + +This program is distributed in the hope that it will be useful, +but WITHOUT ANY WARRANTY; without even the implied warranty of +MERCHANTABILITY or FITNESS FOR A PARTICULAR PURPOSE. See the +GNU General Public License for more details. + +You should have received a copy of the GNU General Public License +along with this program. If not, see +*/ + +package engine + +import ( + "net" + "strconv" + "sync" + "time" + + "github.com/cgrates/cgrates/utils" +) + +// Caps the structure that allocs requests for API +type Caps struct { + strategy string + aReqs chan struct{} +} + +// NewCaps creates a new caps +func NewCaps(reqs int, strategy string) *Caps { + return &Caps{ + strategy: strategy, + aReqs: make(chan struct{}, reqs), + } +} + +// IsLimited returns true if the limit is not 0 +func (cR *Caps) IsLimited() bool { + return cap(cR.aReqs) != 0 +} + +// Allocated returns the number of requests actively serviced +func (cR *Caps) Allocated() int { + return len(cR.aReqs) +} + +// Allocate will reserve a channel for the API call +func (cR *Caps) Allocate() (err error) { + switch cR.strategy { + case utils.MetaBusy: + if len(cR.aReqs) == cap(cR.aReqs) { + return utils.ErrMaxConcurentRPCExceededNoCaps + } + fallthrough + case utils.MetaQueue: + cR.aReqs <- struct{}{} + } + return +} + +// Deallocate will free a channel for the API call +func (cR *Caps) Deallocate() { + <-cR.aReqs + return +} + +// NewCapsStats returns the stats for the caps +func NewCapsStats(sampleinterval time.Duration, caps *Caps, stopChan chan struct{}) (cs *CapsStats) { + st, _ := NewStatAverage(1, utils.MetaDynReq, nil) + cs = &CapsStats{st: st} + go cs.loop(sampleinterval, stopChan, caps) + return +} + +// CapsStats stores the stats for caps +type CapsStats struct { + sync.RWMutex + st StatMetric + peak int +} + +// OnEvict the function that should be called on cache eviction +func (cs *CapsStats) OnEvict(itmID string, value interface{}) { + cs.st.RemEvent(itmID) +} + +func (cs *CapsStats) loop(intr time.Duration, stopChan chan struct{}, caps *Caps) { + for { + select { + case <-stopChan: + return + case <-time.After(intr): + evID := time.Now().String() + val := caps.Allocated() + cs.addSample(evID, val) + } + } +} + +func (cs *CapsStats) addSample(evID string, val int) { + cs.Lock() + Cache.SetWithoutReplicate(utils.CacheCapsEvents, evID, val, nil, true, utils.NonTransactional) + cs.st.AddEvent(evID, floatDP(val)) + if val > cs.peak { + cs.peak = val + } + cs.Unlock() +} + +// GetPeak returns the maximum allocated caps +func (cs *CapsStats) GetPeak() (peak int) { + cs.RLock() + peak = cs.peak + cs.RUnlock() + return +} + +// GetAverage returns the average allocated caps +func (cs *CapsStats) GetAverage(roundingDecimals int) (avg float64) { + cs.RLock() + avg = cs.st.GetFloat64Value(roundingDecimals) + cs.RUnlock() + return +} + +// floatDP should be only used by capstats +type floatDP float64 + +func (f floatDP) String() string { return strconv.FormatFloat(float64(f), 'f', -1, 64) } +func (f floatDP) FieldAsInterface(fldPath []string) (interface{}, error) { return float64(f), nil } +func (f floatDP) FieldAsString(fldPath []string) (string, error) { return f.String(), nil } +func (f floatDP) RemoteHost() net.Addr { return nil } diff --git a/engine/caps_test.go b/engine/caps_test.go new file mode 100644 index 000000000..ab02f7afc --- /dev/null +++ b/engine/caps_test.go @@ -0,0 +1,146 @@ +/* +Real-time Online/Offline Charging System (OCS) for Telecom & ISP environments +Copyright (C) ITsysCOM GmbH + +This program is free software: you can redistribute it and/or modify +it under the terms of the GNU General Public License as published by +the Free Software Foundation, either version 3 of the License, or +(at your option) any later version. + +This program is distributed in the hope that it will be useful, +but WITHOUT ANY WARRANTY; without even the implied warranty of +MERCHANTABILITY or FITNESS FOR A PARTICULAR PURPOSE. See the +GNU General Public License for more details. + +You should have received a copy of the GNU General Public License +along with this program. If not, see +*/ + +package engine + +import ( + "reflect" + "runtime" + "testing" + "time" + + "github.com/cgrates/cgrates/utils" +) + +func TestNewCaps(t *testing.T) { + exp := &Caps{ + strategy: utils.MetaBusy, + aReqs: make(chan struct{}, 0), + } + cs := NewCaps(0, utils.MetaBusy) + + // only check the strategy + if !reflect.DeepEqual(exp.strategy, cs.strategy) { + t.Errorf("Expected: %v ,received: %v", exp, cs) + } + + if cs.IsLimited() { + t.Errorf("Expected to not be limited") + } + + if al := cs.Allocated(); al != 0 { + t.Errorf("Expected: %v ,received: %v", 0, al) + } + if err := cs.Allocate(); err != utils.ErrMaxConcurentRPCExceededNoCaps { + t.Errorf("Expected: %v ,received: %v", utils.ErrMaxConcurentRPCExceededNoCaps, err) + } + cs = NewCaps(1, utils.MetaBusy) + if err := cs.Allocate(); err != nil { + t.Error(err) + } + cs.Deallocate() +} + +func TestCapsStats(t *testing.T) { + st, err := NewStatAverage(1, utils.MetaDynReq, nil) + if err != nil { + t.Error(err) + } + exp := &CapsStats{st: st} + cr := NewCaps(0, utils.MetaBusy) + exitChan := make(chan struct{}, 1) + close(exitChan) + cs := NewCapsStats(1, cr, exitChan) + if !reflect.DeepEqual(exp, cs) { + t.Errorf("Expected: %v ,received: %v", exp, cs) + } + <-exitChan + exitChan = make(chan struct{}, 1) + go func() { + runtime.Gosched() + time.Sleep(100) + close(exitChan) + }() + cr = NewCaps(10, utils.MetaBusy) + cr.Allocate() + cr.Allocate() + cs.loop(1, exitChan, cr) + if avg := cs.GetAverage(2); avg <= 0 { + t.Errorf("Expected at least an event to be processed: %v", avg) + } + if pk := cs.GetPeak(); pk != 2 { + t.Errorf("Expected the peak to be 2 received: %v", pk) + } + <-exitChan +} + +func TestCapsStatsGetAverage(t *testing.T) { + st, err := NewStatAverage(1, utils.MetaDynReq, nil) + if err != nil { + t.Error(err) + } + cs := &CapsStats{st: st} + cs.addSample("1", 10) + expAvg := 10. + if avg := cs.GetAverage(2); avg != expAvg { + t.Errorf("Expected: %v ,received: %v", expAvg, avg) + } + expPk := 10 + if pk := cs.GetPeak(); pk != expPk { + t.Errorf("Expected: %v ,received:%v", expPk, pk) + } + cs.addSample("2", 16) + expAvg = 13. + if avg := cs.GetAverage(2); avg != expAvg { + t.Errorf("Expected: %v ,received: %v", expAvg, avg) + } + expPk = 16 + if pk := cs.GetPeak(); pk != expPk { + t.Errorf("Expected: %v ,received:%v", expPk, pk) + } + cs.OnEvict("2", nil) + expAvg = 10. + if avg := cs.GetAverage(2); avg != expAvg { + t.Errorf("Expected: %v ,received: %v", expAvg, avg) + } + if pk := cs.GetPeak(); pk != expPk { + t.Errorf("Expected: %v ,received:%v", expPk, pk) + } +} + +func TestFloatDP(t *testing.T) { + f := floatDP(10.) + expStr := "10" + if s := f.String(); s != expStr { + t.Errorf("Expected: %v ,received:%v", expStr, s) + } + if s, err := f.FieldAsString(nil); err != nil { + t.Error(err) + } else if s != expStr { + t.Errorf("Expected: %v ,received:%v", expStr, s) + } + if r := f.RemoteHost(); r != nil { + t.Errorf("Expected remote host to be nil received:%v", r) + } + exp := 10. + if s, err := f.FieldAsInterface(nil); err != nil { + t.Error(err) + } else if s != exp { + t.Errorf("Expected: %v ,received:%v", exp, s) + } +} diff --git a/engine/z_loader_it_test.go b/engine/z_loader_it_test.go index 693bdff89..5c7a96e81 100644 --- a/engine/z_loader_it_test.go +++ b/engine/z_loader_it_test.go @@ -105,7 +105,7 @@ func testLoaderITInitDataDB(t *testing.T) { } } cacheChan := make(chan rpcclient.ClientConnector, 1) - cacheChan <- NewCacheS(lCfg, dataDbCsv) + cacheChan <- NewCacheS(lCfg, dataDbCsv, nil) connMgr = NewConnManager(lCfg, map[string]chan rpcclient.ClientConnector{ utils.ConcatenatedKey(utils.MetaInternal, utils.MetaCaches): cacheChan, }) diff --git a/services/apiers_it_test.go b/services/apiers_it_test.go index e8f58826f..fbacb0426 100644 --- a/services/apiers_it_test.go +++ b/services/apiers_it_test.go @@ -42,7 +42,7 @@ func TestApiersReload(t *testing.T) { filterSChan := make(chan *engine.FilterS, 1) filterSChan <- nil engineShutdown := make(chan struct{}, 2) - chS := engine.NewCacheS(cfg, nil) + chS := engine.NewCacheS(cfg, nil, nil) close(chS.GetPrecacheChannel(utils.CacheThresholdProfiles)) close(chS.GetPrecacheChannel(utils.CacheThresholds)) close(chS.GetPrecacheChannel(utils.CacheThresholdFilterIndexes)) diff --git a/services/asteriskagent_it_test.go b/services/asteriskagent_it_test.go index fcd72bb8d..7c0cbf672 100644 --- a/services/asteriskagent_it_test.go +++ b/services/asteriskagent_it_test.go @@ -43,7 +43,7 @@ func TestAsteriskAgentReload(t *testing.T) { filterSChan := make(chan *engine.FilterS, 1) filterSChan <- nil engineShutdown := make(chan struct{}, 1) - chS := engine.NewCacheS(cfg, nil) + chS := engine.NewCacheS(cfg, nil, nil) cacheSChan := make(chan rpcclient.ClientConnector, 1) cacheSChan <- chS diff --git a/services/attributes_it_test.go b/services/attributes_it_test.go index 8a956f809..5fe50c9b3 100644 --- a/services/attributes_it_test.go +++ b/services/attributes_it_test.go @@ -42,7 +42,7 @@ func TestAttributeSReload(t *testing.T) { utils.Logger.SetLogLevel(7) engineShutdown := make(chan struct{}, 1) - chS := engine.NewCacheS(cfg, nil) + chS := engine.NewCacheS(cfg, nil, nil) filterSChan := make(chan *engine.FilterS, 1) filterSChan <- nil close(chS.GetPrecacheChannel(utils.CacheAttributeProfiles)) diff --git a/services/cdrs_it_test.go b/services/cdrs_it_test.go index d3f51d149..0bc4dfdb6 100644 --- a/services/cdrs_it_test.go +++ b/services/cdrs_it_test.go @@ -42,7 +42,7 @@ func TestCdrsReload(t *testing.T) { filterSChan := make(chan *engine.FilterS, 1) filterSChan <- nil engineShutdown := make(chan struct{}, 1) - chS := engine.NewCacheS(cfg, nil) + chS := engine.NewCacheS(cfg, nil, nil) close(chS.GetPrecacheChannel(utils.CacheChargerProfiles)) close(chS.GetPrecacheChannel(utils.CacheChargerFilterIndexes)) diff --git a/services/chargers_it_test.go b/services/chargers_it_test.go index 240b84350..bb536aa72 100644 --- a/services/chargers_it_test.go +++ b/services/chargers_it_test.go @@ -42,7 +42,7 @@ func TestChargerSReload(t *testing.T) { utils.Logger.SetLogLevel(7) cfg.AttributeSCfg().Enabled = true engineShutdown := make(chan struct{}, 1) - chS := engine.NewCacheS(cfg, nil) + chS := engine.NewCacheS(cfg, nil, nil) close(chS.GetPrecacheChannel(utils.CacheAttributeProfiles)) close(chS.GetPrecacheChannel(utils.CacheAttributeFilterIndexes)) close(chS.GetPrecacheChannel(utils.CacheChargerProfiles)) diff --git a/services/cores.go b/services/cores.go index 9deb741b7..1ce4ff2e1 100644 --- a/services/cores.go +++ b/services/cores.go @@ -25,14 +25,14 @@ import ( v1 "github.com/cgrates/cgrates/apier/v1" "github.com/cgrates/cgrates/config" "github.com/cgrates/cgrates/cores" - "github.com/cgrates/cgrates/servmanager" + "github.com/cgrates/cgrates/engine" "github.com/cgrates/cgrates/utils" "github.com/cgrates/rpcclient" ) // NewCoreService returns the Core Service -func NewCoreService(cfg *config.CGRConfig, caps *cores.Caps, server *cores.Server, - internalCoreSChan chan rpcclient.ClientConnector, anz *AnalyzerService) servmanager.Service { +func NewCoreService(cfg *config.CGRConfig, caps *engine.Caps, server *cores.Server, + internalCoreSChan chan rpcclient.ClientConnector, anz *AnalyzerService) *CoreService { return &CoreService{ connChan: internalCoreSChan, cfg: cfg, @@ -47,7 +47,7 @@ type CoreService struct { sync.RWMutex cfg *config.CGRConfig server *cores.Server - caps *cores.Caps + caps *engine.Caps stopChan chan struct{} cS *cores.CoreService @@ -108,3 +108,10 @@ func (cS *CoreService) ServiceName() string { func (cS *CoreService) ShouldRun() bool { return true } + +// GetCoreS returns the coreS +func (cS *CoreService) GetCoreS() *cores.CoreService { + cS.RLock() + defer cS.RUnlock() + return cS.cS +} diff --git a/services/datadb_it_test.go b/services/datadb_it_test.go index f1163a6ff..c64cf6476 100644 --- a/services/datadb_it_test.go +++ b/services/datadb_it_test.go @@ -43,7 +43,7 @@ func TestDataDBReload(t *testing.T) { utils.Logger.SetLogLevel(7) engineShutdown := make(chan struct{}, 1) - chS := engine.NewCacheS(cfg, nil) + chS := engine.NewCacheS(cfg, nil, nil) filterSChan := make(chan *engine.FilterS, 1) filterSChan <- nil close(chS.GetPrecacheChannel(utils.CacheAttributeProfiles)) diff --git a/services/diameteragent_it_test.go b/services/diameteragent_it_test.go index 341fd80ab..852fc3f2d 100644 --- a/services/diameteragent_it_test.go +++ b/services/diameteragent_it_test.go @@ -43,7 +43,7 @@ func TestDiameterAgentReload(t *testing.T) { filterSChan := make(chan *engine.FilterS, 1) filterSChan <- nil engineShutdown := make(chan struct{}, 1) - chS := engine.NewCacheS(cfg, nil) + chS := engine.NewCacheS(cfg, nil, nil) cacheSChan := make(chan rpcclient.ClientConnector, 1) cacheSChan <- chS diff --git a/services/dispatchers_it_test.go b/services/dispatchers_it_test.go index 88011abb6..d3ffafed7 100644 --- a/services/dispatchers_it_test.go +++ b/services/dispatchers_it_test.go @@ -42,7 +42,7 @@ func TestDispatcherSReload(t *testing.T) { utils.Logger.SetLogLevel(7) cfg.AttributeSCfg().Enabled = true engineShutdown := make(chan struct{}, 1) - chS := engine.NewCacheS(cfg, nil) + chS := engine.NewCacheS(cfg, nil, nil) close(chS.GetPrecacheChannel(utils.CacheAttributeProfiles)) close(chS.GetPrecacheChannel(utils.CacheAttributeFilterIndexes)) close(chS.GetPrecacheChannel(utils.CacheDispatcherProfiles)) diff --git a/services/dnsagent_it_test.go b/services/dnsagent_it_test.go index 65a8f3a27..e0d65cc96 100644 --- a/services/dnsagent_it_test.go +++ b/services/dnsagent_it_test.go @@ -43,7 +43,7 @@ func TestDNSAgentReload(t *testing.T) { filterSChan := make(chan *engine.FilterS, 1) filterSChan <- nil engineShutdown := make(chan struct{}, 1) - chS := engine.NewCacheS(cfg, nil) + chS := engine.NewCacheS(cfg, nil, nil) cacheSChan := make(chan rpcclient.ClientConnector, 1) cacheSChan <- chS diff --git a/services/ees_it_test.go b/services/ees_it_test.go index 95ec443c6..de9fd6dc9 100644 --- a/services/ees_it_test.go +++ b/services/ees_it_test.go @@ -57,7 +57,7 @@ func TestEventExporterSReload(t *testing.T) { server := cores.NewServer(nil) srvMngr := servmanager.NewServiceManager(cfg, engineShutdown) db := NewDataDBService(cfg, nil) - chS := engine.NewCacheS(cfg, nil) + chS := engine.NewCacheS(cfg, nil, nil) close(chS.GetPrecacheChannel(utils.CacheAttributeProfiles)) close(chS.GetPrecacheChannel(utils.CacheAttributeFilterIndexes)) anz := NewAnalyzerService(cfg, server, filterSChan, engineShutdown, make(chan rpcclient.ClientConnector, 1)) diff --git a/services/freeswitchagent_it_test.go b/services/freeswitchagent_it_test.go index 897ca57b0..c06fbf709 100644 --- a/services/freeswitchagent_it_test.go +++ b/services/freeswitchagent_it_test.go @@ -43,7 +43,7 @@ func TestFreeSwitchAgentReload(t *testing.T) { filterSChan := make(chan *engine.FilterS, 1) filterSChan <- nil engineShutdown := make(chan struct{}, 1) - chS := engine.NewCacheS(cfg, nil) + chS := engine.NewCacheS(cfg, nil, nil) cacheSChan := make(chan rpcclient.ClientConnector, 1) cacheSChan <- chS diff --git a/services/kamailioagent_it_test.go b/services/kamailioagent_it_test.go index f266d62b7..457665bac 100644 --- a/services/kamailioagent_it_test.go +++ b/services/kamailioagent_it_test.go @@ -43,7 +43,7 @@ func TestKamailioAgentReload(t *testing.T) { filterSChan := make(chan *engine.FilterS, 1) filterSChan <- nil engineShutdown := make(chan struct{}, 1) - chS := engine.NewCacheS(cfg, nil) + chS := engine.NewCacheS(cfg, nil, nil) cacheSChan := make(chan rpcclient.ClientConnector, 1) cacheSChan <- chS diff --git a/services/sessions.go b/services/sessions.go index 754e08026..8b3589014 100644 --- a/services/sessions.go +++ b/services/sessions.go @@ -37,7 +37,7 @@ import ( func NewSessionService(cfg *config.CGRConfig, dm *DataDBService, server *cores.Server, internalChan chan rpcclient.ClientConnector, exitChan chan<- struct{}, connMgr *engine.ConnManager, - caps *cores.Caps, + caps *engine.Caps, anz *AnalyzerService) servmanager.Service { return &SessionService{ connChan: internalChan, @@ -68,7 +68,7 @@ type SessionService struct { // in order to stop the bircp server if necesary bircpEnabled bool connMgr *engine.ConnManager - caps *cores.Caps + caps *engine.Caps anz *AnalyzerService }