Add CAPs counting to cgr-console status command

This commit is contained in:
arberkatellari
2024-03-01 12:58:00 +02:00
committed by Dan Christian Bogos
parent 49d6b8d565
commit 9be32c36da
8 changed files with 126 additions and 5 deletions

View File

@@ -58,6 +58,9 @@ var (
testConfigSReloadConfigCoreSDryRun,
testConfigSReloadConfigCoreS,
testConfigSKillEngine,
testConfigSStartEngineCAPSAllocated,
testConfigSCAPSPeak,
testConfigSKillEngine,
testConfigStartEngineWithConfigs,
testConfigStartEngineFromHTTP,
testConfigSKillEngine,
@@ -566,3 +569,62 @@ func testConfigSReloadConfigCoreS(t *testing.T) {
t.Errorf("Expected %q , received: %q", cfgStr, rpl)
}
}
func testConfigSStartEngineCAPSAllocated(t *testing.T) {
var err error
configCfgPath = path.Join(*dataDir, "conf", "samples", "caps_peak")
configCfg, err = config.NewCGRConfigFromPath(configCfgPath)
if err != nil {
t.Error(err)
}
if _, err := engine.StopStartEngine(configCfgPath, *waitRater); err != nil {
t.Fatal(err)
}
configRPC, err = newRPCClient(configCfg.ListenCfg()) // We connect over JSON so we can also troubleshoot if needed
if err != nil {
t.Fatal(err)
}
var rply map[string]any
if err := configRPC.Call(context.Background(), utils.CoreSv1Status, &utils.TenantWithAPIOpts{}, &rply); err != nil {
t.Error(err)
} else if rply[utils.NodeID] != "CAPSPeakEngine" {
t.Errorf("Expected %+v , received: %+v ", "CAPSPeakEngine", rply)
}
if _, has := rply[utils.CAPSAllocated]; !has {
t.Errorf("Expected reply to contain CAPSAllocated , received <%+v>", rply)
}
}
func testConfigSCAPSPeak(t *testing.T) {
cfgStr := `{"cores":{"caps":2,"caps_stats_interval":"100ms","caps_strategy":"*queue","shutdown_timeout":"1s"}}`
var reply string
if err := configRPC.Call(context.Background(), utils.ConfigSv1ReloadConfig, &config.ReloadArgs{
Tenant: "cgrates.org",
Path: path.Join(*dataDir, "conf", "samples", "caps_peak"),
Section: config.CoreSCfgJson,
}, &reply); err != nil {
t.Error(err)
} else if reply != utils.OK {
t.Errorf("Expected OK received: %s", reply)
}
var rpl string
if err := configRPC.Call(context.Background(), utils.ConfigSv1GetConfigAsJSON, &config.SectionWithAPIOpts{
Tenant: "cgrates.org",
Section: config.CoreSCfgJson,
}, &rpl); err != nil {
t.Error(err)
} else if cfgStr != rpl {
t.Errorf("Expected %q , received: %q", cfgStr, rpl)
}
var rply map[string]any
if err := configRPC.Call(context.Background(), utils.CoreSv1Status, &utils.TenantWithAPIOpts{}, &rply); err != nil {
t.Error(err)
}
if _, has := rply[utils.CAPSPeak]; !has {
t.Errorf("Expected reply to contain CAPSPeak , received <%+v>", rply)
}
}

View File

@@ -60,8 +60,8 @@ const CGRATES_CFG_JSON = `
"cores": {
"caps": 0, // maximum concurrent request allowed ( 0 to disabled )
"caps_strategy": "*busy", // strategy in case in case of concurrent requests reached
"caps_stats_interval": "0", // the interval we sample for caps stats ( 0 to disabled )
"caps_strategy": "*busy", // strategy in case of concurrent requests reached
"caps_stats_interval": "0", // the interval duration we sample for caps stats ( 0 to disabled )
"shutdown_timeout": "1s" // the duration to wait until all services are stopped
},

View File

@@ -49,6 +49,7 @@ func NewCoreService(cfg *config.CGRConfig, caps *engine.Caps, fileCPU io.Closer,
CapsStats: st,
fileCPU: fileCPU,
fileMEM: fileMem,
caps: caps,
}
}
@@ -61,6 +62,7 @@ type CoreService struct {
fileMEM string
fileCPU io.Closer
fileMx sync.Mutex
caps *engine.Caps
}
// Shutdown is called to shutdown the service
@@ -141,6 +143,12 @@ func (cS *CoreService) V1Status(_ *context.Context, _ *utils.TenantWithAPIOpts,
}
response[utils.RunningSince] = utils.GetStartTime()
response[utils.GoVersion] = runtime.Version()
if cS.cfg.CoreSCfg().Caps != 0 {
response[utils.CAPSAllocated] = cS.caps.Allocated()
if cS.cfg.CoreSCfg().CapsStatsInterval != 0 {
response[utils.CAPSPeak] = cS.CapsStats.GetPeak()
}
}
*reply = response
return
}

View File

@@ -47,11 +47,12 @@ func TestNewCoreService(t *testing.T) {
stopMemPrf: stopMemPrf,
cfg: cfgDflt,
CapsStats: sts,
caps: caps,
}
rcv := NewCoreService(cfgDflt, caps, nil, "/tmp", stopchan, shdWg, stopMemPrf, shdChan)
if !reflect.DeepEqual(expected, rcv) {
t.Errorf("Expected %+v, received %+v", utils.ToJSON(expected), utils.ToJSON(rcv))
t.Errorf("Expected %+v, received %+v", expected, rcv)
}
close(stopchan)
//shut down the service

View File

@@ -1,7 +1,7 @@
---
kafka_download_base_url: https://downloads.apache.org/kafka
kafka_download_validate_certs: yes
kafka_version: 3.6.0
kafka_version: 3.7.0
kafka_scala_version: 2.13
# The kafka user and group to create files/dirs with and for running the kafka service

View File

@@ -0,0 +1,48 @@
{
"general": {
"log_level": 7,
"node_id": "CAPSPeakEngine",
"reply_timeout": "50s",
},
"cores": {
"caps": 2,
"caps_strategy": "*queue",
"caps_stats_interval":"100ms"
},
"listen": {
"rpc_json": ":2012",
"rpc_gob": ":2013",
"http": ":2080"
},
"data_db": { // database used to store runtime data (eg: accounts, cdr stats)
"db_type": "redis", // data_db type: <redis|mongo>
"db_port": 6379, // data_db port to reach the database
"db_name": "10" // data_db database name to connect to
},
"stor_db": {
"db_password": "CGRateS.org"
},
"apiers": {
"enabled": true
},
"sessions": {
"enabled": true,
"listen_bijson": "127.0.0.1:2014"
},
}

View File

@@ -1,7 +1,7 @@
#!/bin/bash
mongo --quiet create_user.js
mongosh --quiet create_user.js
cu=$?
if [ $cu = 0 ]; then

View File

@@ -362,6 +362,8 @@ const (
UpdatedAt = "UpdatedAt"
NodeID = "NodeID"
ActiveGoroutines = "ActiveGoroutines"
CAPSAllocated = "CAPSAllocated"
CAPSPeak = "CAPSPeak"
MemoryUsage = "MemoryUsage"
RunningSince = "RunningSince"
GoVersion = "GoVersion"