mirror of
https://github.com/cgrates/cgrates.git
synced 2026-02-11 18:16:24 +05:00
Add connection from StatS to Threshold through ConnManager
This commit is contained in:
@@ -494,6 +494,8 @@ func main() {
|
||||
internalDispatcherSChan := make(chan rpcclient.RpcClientConnection, 1) // needed to avod cyclic dependency
|
||||
internalSessionSChan := make(chan rpcclient.RpcClientConnection, 1) // needed to avod cyclic dependency
|
||||
internalChargerSChan := make(chan rpcclient.RpcClientConnection, 1) // needed to avod cyclic dependency
|
||||
internalThresholdSChan := make(chan rpcclient.RpcClientConnection, 1) // needed to avod cyclic dependency
|
||||
internalStatSChan := make(chan rpcclient.RpcClientConnection, 1) // needed to avod cyclic dependency
|
||||
|
||||
// init CacheS
|
||||
cacheS := initCacheS(internalCacheSChan, server, dmService.GetDM(), exitChan)
|
||||
@@ -511,7 +513,7 @@ func main() {
|
||||
//utils.ApierV1: rals.GetAPIv1().GetIntenternalChan(),
|
||||
//utils.ApierV2: rals.GetAPIv2().GetIntenternalChan(),
|
||||
utils.ConcatenatedKey(utils.MetaInternal, utils.MetaAttributes): internalAttributeSChan,
|
||||
utils.CacheSv1: internalCacheSChan,
|
||||
utils.ConcatenatedKey(utils.MetaInternal, utils.MetaCaches): internalCacheSChan,
|
||||
//utils.CDRsV1: cdrS.GetIntenternalChan(),
|
||||
//utils.CDRsV2: cdrS.GetIntenternalChan(),
|
||||
utils.ConcatenatedKey(utils.MetaInternal, utils.MetaChargers): internalChargerSChan,
|
||||
@@ -521,12 +523,12 @@ func main() {
|
||||
//utils.Responder: rals.GetResponder().GetIntenternalChan(),
|
||||
//utils.SchedulerSv1: schS.GetIntenternalChan(),
|
||||
utils.ConcatenatedKey(utils.MetaInternal, utils.MetaSessionS): internalSessionSChan,
|
||||
//utils.StatSv1: stS.GetIntenternalChan(),
|
||||
utils.ConcatenatedKey(utils.MetaInternal, utils.MetaStatS): internalStatSChan,
|
||||
//utils.SupplierSv1: supS.GetIntenternalChan(),
|
||||
//utils.ThresholdSv1: tS.GetIntenternalChan(),
|
||||
utils.ServiceManagerV1: internalServeManagerChan,
|
||||
utils.ConfigSv1: internalConfigChan,
|
||||
utils.CoreSv1: internalCoreSv1Chan,
|
||||
utils.ConcatenatedKey(utils.MetaInternal, utils.MetaThresholds): internalThresholdSChan,
|
||||
utils.ConcatenatedKey(utils.MetaInternal, utils.MetaServiceManager): internalServeManagerChan,
|
||||
utils.ConcatenatedKey(utils.MetaInternal, utils.MetaConfig): internalConfigChan,
|
||||
utils.ConcatenatedKey(utils.MetaInternal, utils.MetaCore): internalCoreSv1Chan,
|
||||
//utils.RALsV1: rals.GetIntenternalChan(),
|
||||
})
|
||||
|
||||
@@ -534,9 +536,9 @@ func main() {
|
||||
dspS := services.NewDispatcherService(cfg, dmService, cacheS, filterSChan, server, internalAttributeSChan, internalDispatcherSChan)
|
||||
chrS := services.NewChargerService(cfg, dmService, cacheS, filterSChan, server,
|
||||
internalChargerSChan, connManager.GetConnMgr())
|
||||
tS := services.NewThresholdService(cfg, dmService, cacheS, filterSChan, server)
|
||||
tS := services.NewThresholdService(cfg, dmService, cacheS, filterSChan, server, internalThresholdSChan)
|
||||
stS := services.NewStatService(cfg, dmService, cacheS, filterSChan, server,
|
||||
tS.GetIntenternalChan(), dspS.GetIntenternalChan())
|
||||
internalStatSChan, connManager.GetConnMgr())
|
||||
reS := services.NewResourceService(cfg, dmService, cacheS, filterSChan, server,
|
||||
tS.GetIntenternalChan(), dspS.GetIntenternalChan())
|
||||
supS := services.NewSupplierService(cfg, dmService, cacheS, filterSChan, server,
|
||||
|
||||
@@ -177,9 +177,7 @@ func TestCGRConfigReloadStatS(t *testing.T) {
|
||||
StringIndexedFields: &[]string{utils.Account},
|
||||
PrefixIndexedFields: &[]string{},
|
||||
IndexedSelects: true,
|
||||
ThresholdSConns: []*RemoteHost{
|
||||
&RemoteHost{Address: "127.0.0.1:2012", Transport: utils.MetaJSON},
|
||||
},
|
||||
ThresholdSConns: []string{utils.MetaLocalHost},
|
||||
}
|
||||
if !reflect.DeepEqual(expAttr, cfg.StatSCfg()) {
|
||||
t.Errorf("Expected %s , received: %s ", utils.ToJSON(expAttr), utils.ToJSON(cfg.StatSCfg()))
|
||||
|
||||
@@ -909,7 +909,7 @@ func TestDfStatServiceJsonCfg(t *testing.T) {
|
||||
Indexed_selects: utils.BoolPointer(true),
|
||||
Store_interval: utils.StringPointer(""),
|
||||
Store_uncompressed_limit: utils.IntPointer(0),
|
||||
Thresholds_conns: &[]*RemoteHostJson{},
|
||||
Thresholds_conns: &[]string{},
|
||||
String_indexed_fields: nil,
|
||||
Prefix_indexed_fields: &[]string{},
|
||||
}
|
||||
|
||||
@@ -845,7 +845,7 @@ func TestCgrCfgJSONDefaultStatsCfg(t *testing.T) {
|
||||
Enabled: false,
|
||||
IndexedSelects: true,
|
||||
StoreInterval: 0,
|
||||
ThresholdSConns: []*RemoteHost{},
|
||||
ThresholdSConns: []string{},
|
||||
StringIndexedFields: nil,
|
||||
PrefixIndexedFields: &[]string{},
|
||||
}
|
||||
|
||||
@@ -335,10 +335,13 @@ func (cfg *CGRConfig) checkConfigSanity() error {
|
||||
}
|
||||
}
|
||||
// StatS checks
|
||||
if cfg.statsCfg.Enabled && !cfg.thresholdSCfg.Enabled && !cfg.dispatcherSCfg.Enabled {
|
||||
for _, connCfg := range cfg.statsCfg.ThresholdSConns {
|
||||
if connCfg.Address == utils.MetaInternal {
|
||||
return fmt.Errorf("<%s> not enabled but requested by <%s> component.", utils.ThresholdS, utils.StatService)
|
||||
if cfg.statsCfg.Enabled {
|
||||
for _, connID := range cfg.statsCfg.ThresholdSConns {
|
||||
if strings.HasPrefix(connID, utils.MetaInternal) && !cfg.thresholdSCfg.Enabled {
|
||||
return fmt.Errorf("<%s> not enabled but requested by <%s> component.", utils.ThresholdS, utils.StatS)
|
||||
}
|
||||
if _, has := cfg.rpcConns[connID]; !has && !strings.HasPrefix(connID, utils.MetaInternal) {
|
||||
return fmt.Errorf("<%s> Connection with id: <%s> not defined", utils.StatS, connID)
|
||||
}
|
||||
}
|
||||
}
|
||||
|
||||
@@ -544,14 +544,10 @@ func TestConfigSanityResourceLimiter(t *testing.T) {
|
||||
func TestConfigSanityStatS(t *testing.T) {
|
||||
cfg, _ = NewDefaultCGRConfig()
|
||||
cfg.statsCfg = &StatSCfg{
|
||||
Enabled: true,
|
||||
ThresholdSConns: []*RemoteHost{
|
||||
&RemoteHost{
|
||||
Address: utils.MetaInternal,
|
||||
},
|
||||
},
|
||||
Enabled: true,
|
||||
ThresholdSConns: []string{utils.MetaInternal},
|
||||
}
|
||||
expected := "<ThresholdS> not enabled but requested by <StatS> component."
|
||||
expected := "<ThresholdS> not enabled but requested by <Stats> component."
|
||||
if err := cfg.checkConfigSanity(); err == nil || err.Error() != expected {
|
||||
t.Errorf("Expecting: %+q received: %+q", expected, err)
|
||||
}
|
||||
@@ -622,7 +618,11 @@ func TestConfigSanityEventReader(t *testing.T) {
|
||||
t.Errorf("Expecting: %+q received: %+q", expected, err)
|
||||
}
|
||||
cfg.ersCfg.SessionSConns = []string{utils.MetaInternal}
|
||||
|
||||
expected = "<SessionS> not enabled but requested by <ERs> component."
|
||||
if err := cfg.checkConfigSanity(); err == nil || err.Error() != expected {
|
||||
t.Errorf("Expecting: %+q received: %+q", expected, err)
|
||||
}
|
||||
cfg.sessionSCfg.Enabled = true
|
||||
cfg.ersCfg.Readers = []*EventReaderCfg{
|
||||
&EventReaderCfg{
|
||||
ID: "test",
|
||||
|
||||
@@ -426,7 +426,7 @@ type StatServJsonCfg struct {
|
||||
Indexed_selects *bool
|
||||
Store_interval *string
|
||||
Store_uncompressed_limit *int
|
||||
Thresholds_conns *[]*RemoteHostJson
|
||||
Thresholds_conns *[]string
|
||||
String_indexed_fields *[]string
|
||||
Prefix_indexed_fields *[]string
|
||||
}
|
||||
|
||||
@@ -29,7 +29,7 @@ type StatSCfg struct {
|
||||
IndexedSelects bool
|
||||
StoreInterval time.Duration // Dump regularly from cache into dataDB
|
||||
StoreUncompressedLimit int
|
||||
ThresholdSConns []*RemoteHost
|
||||
ThresholdSConns []string
|
||||
StringIndexedFields *[]string
|
||||
PrefixIndexedFields *[]string
|
||||
}
|
||||
@@ -53,10 +53,14 @@ func (st *StatSCfg) loadFromJsonCfg(jsnCfg *StatServJsonCfg) (err error) {
|
||||
st.StoreUncompressedLimit = *jsnCfg.Store_uncompressed_limit
|
||||
}
|
||||
if jsnCfg.Thresholds_conns != nil {
|
||||
st.ThresholdSConns = make([]*RemoteHost, len(*jsnCfg.Thresholds_conns))
|
||||
for idx, jsnHaCfg := range *jsnCfg.Thresholds_conns {
|
||||
st.ThresholdSConns[idx] = NewDfltRemoteHost()
|
||||
st.ThresholdSConns[idx].loadFromJsonCfg(jsnHaCfg)
|
||||
st.ThresholdSConns = make([]string, len(*jsnCfg.Thresholds_conns))
|
||||
for idx, conn := range *jsnCfg.Thresholds_conns {
|
||||
// if we have the connection internal we change the name so we can have internal rpc for each subsystem
|
||||
if conn == utils.MetaInternal {
|
||||
st.ThresholdSConns[idx] = utils.ConcatenatedKey(utils.MetaInternal, utils.MetaThresholds)
|
||||
} else {
|
||||
st.ThresholdSConns[idx] = conn
|
||||
}
|
||||
}
|
||||
}
|
||||
if jsnCfg.String_indexed_fields != nil {
|
||||
|
||||
@@ -46,7 +46,7 @@ func TestStatSCfgloadFromJsonCfg(t *testing.T) {
|
||||
}`
|
||||
expected = StatSCfg{
|
||||
StoreInterval: time.Duration(time.Second * 2),
|
||||
ThresholdSConns: []*RemoteHost{},
|
||||
ThresholdSConns: []string{},
|
||||
PrefixIndexedFields: &[]string{"index1", "index2"},
|
||||
}
|
||||
if jsnCfg, err := NewCgrJsonCfgFromBytes([]byte(cfgJSONStr)); err != nil {
|
||||
|
||||
@@ -92,9 +92,7 @@
|
||||
"stats": {
|
||||
"enabled": true,
|
||||
"store_interval": "1s",
|
||||
"thresholds_conns": [
|
||||
{"address": "*internal"}
|
||||
],
|
||||
"thresholds_conns": ["*internal"],
|
||||
},
|
||||
|
||||
|
||||
|
||||
@@ -92,9 +92,7 @@
|
||||
"stats": {
|
||||
"enabled": true,
|
||||
"store_interval": "1s",
|
||||
"thresholds_conns": [
|
||||
{"address": "*internal"}
|
||||
],
|
||||
"thresholds_conns": ["*internal"],
|
||||
},
|
||||
|
||||
|
||||
|
||||
@@ -94,9 +94,7 @@
|
||||
"stats": {
|
||||
"enabled": true,
|
||||
"store_interval": "1s",
|
||||
"thresholds_conns": [
|
||||
{"address": "*internal"}
|
||||
],
|
||||
"thresholds_conns": ["*internal"],
|
||||
},
|
||||
|
||||
"thresholds": {
|
||||
|
||||
@@ -50,9 +50,7 @@
|
||||
"stats": {
|
||||
"enabled": true,
|
||||
"store_interval": "1s",
|
||||
"thresholds_conns": [
|
||||
{"address": "*internal"}
|
||||
],
|
||||
"thresholds_conns": ["*internal"],
|
||||
},
|
||||
|
||||
"thresholds": {
|
||||
|
||||
@@ -51,9 +51,7 @@
|
||||
"stats": {
|
||||
"enabled": true,
|
||||
"store_interval": "1s",
|
||||
"thresholds_conns": [
|
||||
{"address": "*internal"}
|
||||
],
|
||||
"thresholds_conns": ["*internal"],
|
||||
},
|
||||
|
||||
"thresholds": {
|
||||
|
||||
@@ -51,9 +51,7 @@
|
||||
"stats": {
|
||||
"enabled": true,
|
||||
"store_interval": "1s",
|
||||
"thresholds_conns": [
|
||||
{"address": "*internal"}
|
||||
],
|
||||
"thresholds_conns": ["*internal"],
|
||||
},
|
||||
|
||||
"thresholds": {
|
||||
|
||||
@@ -51,9 +51,7 @@
|
||||
"stats": {
|
||||
"enabled": true,
|
||||
"store_interval": "1s",
|
||||
"thresholds_conns": [
|
||||
{"address": "*internal"}
|
||||
],
|
||||
"thresholds_conns": ["*internal"],
|
||||
},
|
||||
|
||||
"thresholds": {
|
||||
|
||||
@@ -51,9 +51,7 @@
|
||||
"stats": {
|
||||
"enabled": true,
|
||||
"store_interval": "1s",
|
||||
"thresholds_conns": [
|
||||
{"address": "*internal"}
|
||||
],
|
||||
"thresholds_conns": ["*internal"],
|
||||
},
|
||||
|
||||
"thresholds": {
|
||||
|
||||
@@ -51,9 +51,7 @@
|
||||
"stats": {
|
||||
"enabled": true,
|
||||
"store_interval": "1s",
|
||||
"thresholds_conns": [
|
||||
{"address": "*internal"}
|
||||
],
|
||||
"thresholds_conns": ["*internal"],
|
||||
},
|
||||
|
||||
"thresholds": {
|
||||
|
||||
@@ -54,9 +54,7 @@
|
||||
"stats": {
|
||||
"enabled": true,
|
||||
"store_interval": "1s",
|
||||
"thresholds_conns": [
|
||||
{"address": "*internal"}
|
||||
],
|
||||
"thresholds_conns": ["*internal"],
|
||||
},
|
||||
|
||||
|
||||
|
||||
@@ -88,9 +88,7 @@
|
||||
"stats": {
|
||||
"enabled": true,
|
||||
"store_interval": "1s",
|
||||
"thresholds_conns": [
|
||||
{"address": "*internal"}
|
||||
],
|
||||
"thresholds_conns": ["*internal"],
|
||||
},
|
||||
|
||||
|
||||
|
||||
@@ -131,9 +131,7 @@
|
||||
"stats": {
|
||||
"enabled": true,
|
||||
"store_interval": "1s",
|
||||
"thresholds_conns": [
|
||||
{"address": "*internal"}
|
||||
],
|
||||
"thresholds_conns": ["*internal"],
|
||||
},
|
||||
|
||||
"thresholds": {
|
||||
|
||||
@@ -113,9 +113,7 @@
|
||||
"stats": {
|
||||
"enabled": true,
|
||||
"store_interval": "1s",
|
||||
"thresholds_conns": [
|
||||
{"address": "*internal"}
|
||||
],
|
||||
"thresholds_conns": ["*internal"],
|
||||
},
|
||||
|
||||
|
||||
|
||||
@@ -85,9 +85,7 @@
|
||||
"stats": {
|
||||
"enabled": true,
|
||||
"store_interval": "1s",
|
||||
"thresholds_conns": [
|
||||
{"address": "*internal"}
|
||||
],
|
||||
"thresholds_conns": ["*internal"],
|
||||
},
|
||||
|
||||
|
||||
|
||||
@@ -67,9 +67,7 @@
|
||||
"stats": {
|
||||
"enabled": true,
|
||||
"store_interval": "1s",
|
||||
"thresholds_conns": [
|
||||
{"address": "*internal"}
|
||||
],
|
||||
"thresholds_conns": ["*internal"],
|
||||
},
|
||||
|
||||
|
||||
|
||||
@@ -65,9 +65,7 @@
|
||||
"stats": {
|
||||
"enabled": true,
|
||||
"store_interval": "1s",
|
||||
"thresholds_conns": [
|
||||
{"address": "127.0.0.1:2012", "transport": "*json"}
|
||||
],
|
||||
"thresholds_conns": ["*localhost"],
|
||||
},
|
||||
|
||||
"thresholds": {
|
||||
|
||||
@@ -53,6 +53,14 @@
|
||||
},
|
||||
|
||||
|
||||
"rpc_conns": {
|
||||
"conn1": {
|
||||
"strategy": "first",
|
||||
"conns": [{"address": "127.0.0.1:2013", "transport":"*gob"}],
|
||||
},
|
||||
},
|
||||
|
||||
|
||||
"resources": {
|
||||
"enabled": true,
|
||||
"store_interval": "1s",
|
||||
@@ -65,9 +73,7 @@
|
||||
"stats": {
|
||||
"enabled": true,
|
||||
"store_interval": "1s",
|
||||
"thresholds_conns": [
|
||||
{"address": "127.0.0.1:2013", "transport": "*gob"}
|
||||
],
|
||||
"thresholds_conns": ["conn1"],
|
||||
},
|
||||
|
||||
"thresholds": {
|
||||
|
||||
@@ -74,9 +74,7 @@
|
||||
"stats": {
|
||||
"enabled": true,
|
||||
"store_interval": "-1",
|
||||
"thresholds_conns": [
|
||||
{"address": "*internal"}
|
||||
],
|
||||
"thresholds_conns": ["*internal"],
|
||||
},
|
||||
|
||||
"thresholds": {
|
||||
|
||||
@@ -89,9 +89,7 @@
|
||||
"stats": {
|
||||
"enabled": true,
|
||||
"store_interval": "1s",
|
||||
"thresholds_conns": [
|
||||
{"address": "*internal"}
|
||||
],
|
||||
"thresholds_conns": ["*internal"],
|
||||
},
|
||||
|
||||
|
||||
|
||||
@@ -150,9 +150,7 @@
|
||||
|
||||
"stats": {
|
||||
"enabled": true,
|
||||
"thresholds_conns": [
|
||||
{"address": "127.0.0.1:2012", "transport":"*json"}
|
||||
],
|
||||
"thresholds_conns": ["*localhost"],
|
||||
"string_indexed_fields": ["Account"]
|
||||
},
|
||||
|
||||
|
||||
@@ -11,7 +11,7 @@
|
||||
"rpc_conns": {
|
||||
"conn1": {
|
||||
"strategy": "first",
|
||||
"conns": [{{"address": "127.0.0.1:2013", "transport":"*gob"},}],
|
||||
"conns": [{"address": "127.0.0.1:2013", "transport":"*gob"}],
|
||||
},
|
||||
},
|
||||
|
||||
|
||||
@@ -89,9 +89,7 @@
|
||||
"stats": {
|
||||
"enabled": true,
|
||||
"store_interval": "1s",
|
||||
"thresholds_conns": [
|
||||
{"address": "*internal"}
|
||||
],
|
||||
"thresholds_conns": ["*internal"],
|
||||
},
|
||||
|
||||
|
||||
|
||||
@@ -88,9 +88,7 @@
|
||||
"stats": {
|
||||
"enabled": true,
|
||||
"store_interval": "1s",
|
||||
"thresholds_conns": [
|
||||
{"address": "*internal"}
|
||||
],
|
||||
"thresholds_conns": ["*internal"],
|
||||
},
|
||||
|
||||
|
||||
|
||||
@@ -94,9 +94,7 @@
|
||||
"stats": {
|
||||
"enabled": true,
|
||||
"store_interval": "1s",
|
||||
"thresholds_conns": [
|
||||
{"address": "*internal"}
|
||||
],
|
||||
"thresholds_conns": ["*internal"],
|
||||
},
|
||||
|
||||
"thresholds": {
|
||||
|
||||
@@ -115,9 +115,7 @@
|
||||
|
||||
"stats": {
|
||||
"enabled": true,
|
||||
"thresholds_conns": [
|
||||
{"address": "127.0.0.1:2012", "transport":"*json"}
|
||||
],
|
||||
"thresholds_conns": ["*localhost"],
|
||||
"string_indexed_fields": ["Account"]
|
||||
},
|
||||
|
||||
|
||||
@@ -12,7 +12,7 @@
|
||||
"rpc_conns": {
|
||||
"conn1": {
|
||||
"strategy": "first",
|
||||
"conns": [{{"address": "127.0.0.1:2013", "transport":"*gob"},}],
|
||||
"conns": [{"address": "127.0.0.1:2013", "transport":"*gob"}],
|
||||
},
|
||||
},
|
||||
|
||||
@@ -123,9 +123,7 @@
|
||||
|
||||
"stats": {
|
||||
"enabled": true,
|
||||
"thresholds_conns": [
|
||||
{"address": "127.0.0.1:2013", "transport":"*gob"}
|
||||
],
|
||||
"thresholds_conns": ["conn1"],
|
||||
"string_indexed_fields": ["Account"]
|
||||
},
|
||||
|
||||
|
||||
@@ -213,9 +213,7 @@
|
||||
"stats": {
|
||||
"enabled": true,
|
||||
"store_interval": "1s",
|
||||
"thresholds_conns": [
|
||||
{"address": "*internal"}
|
||||
],
|
||||
"thresholds_conns": ["*internal"],
|
||||
},
|
||||
|
||||
"thresholds": {
|
||||
|
||||
@@ -62,9 +62,7 @@
|
||||
"stats": {
|
||||
"enabled": true,
|
||||
"store_interval": "1s",
|
||||
"thresholds_conns": [
|
||||
{"address": "*internal"}
|
||||
],
|
||||
"thresholds_conns": ["*internal"],
|
||||
},
|
||||
|
||||
|
||||
|
||||
@@ -105,9 +105,7 @@
|
||||
|
||||
"stats": {
|
||||
"enabled": true,
|
||||
"thresholds_conns": [
|
||||
{"address": "*internal"}
|
||||
],
|
||||
"thresholds_conns": ["*internal"],
|
||||
"string_indexed_fields": ["Account"],
|
||||
},
|
||||
|
||||
|
||||
@@ -105,9 +105,7 @@
|
||||
|
||||
"stats": {
|
||||
"enabled": true,
|
||||
"thresholds_conns": [
|
||||
{"address": "*internal"}
|
||||
],
|
||||
"thresholds_conns": ["*internal"],
|
||||
"string_indexed_fields": ["Account"],
|
||||
},
|
||||
|
||||
|
||||
@@ -21,25 +21,21 @@ package engine
|
||||
import (
|
||||
"fmt"
|
||||
"math/rand"
|
||||
"reflect"
|
||||
"sync"
|
||||
"time"
|
||||
|
||||
"github.com/cgrates/cgrates/config"
|
||||
"github.com/cgrates/cgrates/guardian"
|
||||
"github.com/cgrates/cgrates/utils"
|
||||
"github.com/cgrates/rpcclient"
|
||||
)
|
||||
|
||||
// NewStatService initializes a StatService
|
||||
func NewStatService(dm *DataManager, cgrcfg *config.CGRConfig,
|
||||
thdS rpcclient.ClientConnector, filterS *FilterS) (ss *StatService, err error) {
|
||||
if thdS != nil && reflect.ValueOf(thdS).IsNil() { // fix nil value in interface
|
||||
thdS = nil
|
||||
}
|
||||
filterS *FilterS, connMgr *ConnManager) (ss *StatService, err error) {
|
||||
|
||||
return &StatService{
|
||||
dm: dm,
|
||||
thdS: thdS,
|
||||
connMgr: connMgr,
|
||||
filterS: filterS,
|
||||
cgrcfg: cgrcfg,
|
||||
storedStatQueues: make(utils.StringMap),
|
||||
@@ -50,7 +46,7 @@ func NewStatService(dm *DataManager, cgrcfg *config.CGRConfig,
|
||||
// StatService builds stats for events
|
||||
type StatService struct {
|
||||
dm *DataManager
|
||||
thdS rpcclient.ClientConnector // rpc connection towards ThresholdS
|
||||
connMgr *ConnManager
|
||||
filterS *FilterS
|
||||
cgrcfg *config.CGRConfig
|
||||
loopStoped chan struct{}
|
||||
@@ -270,7 +266,7 @@ func (sS *StatService) processEvent(args *StatsArgsProcessEvent) (statQueueIDs [
|
||||
sS.ssqMux.Unlock()
|
||||
}
|
||||
}
|
||||
if sS.thdS != nil {
|
||||
if len(sS.cgrcfg.StatSCfg().ThresholdSConns) != 0 {
|
||||
var thIDs []string
|
||||
if len(sq.sqPrfl.ThresholdIDs) != 0 {
|
||||
if len(sq.sqPrfl.ThresholdIDs) == 1 && sq.sqPrfl.ThresholdIDs[0] == utils.META_NONE {
|
||||
@@ -294,7 +290,7 @@ func (sS *StatService) processEvent(args *StatsArgsProcessEvent) (statQueueIDs [
|
||||
thEv.Event[metricID] = metric.GetValue()
|
||||
}
|
||||
var tIDs []string
|
||||
if err := sS.thdS.Call(utils.ThresholdSv1ProcessEvent, thEv, &tIDs); err != nil &&
|
||||
if err := sS.connMgr.Call(sS.cgrcfg.StatSCfg().ThresholdSConns, utils.ThresholdSv1ProcessEvent, thEv, &tIDs); err != nil &&
|
||||
err.Error() != utils.ErrNotFound.Error() {
|
||||
utils.Logger.Warning(
|
||||
fmt.Sprintf("<StatS> error: %s processing event %+v with ThresholdS.", err.Error(), thEv))
|
||||
@@ -425,9 +421,3 @@ func (sS *StatService) Reload() {
|
||||
func (sS *StatService) StartLoop() {
|
||||
go sS.runBackup()
|
||||
}
|
||||
|
||||
// SetThresholdConnection sets the new connection to the threshold service
|
||||
// only used on reload
|
||||
func (sS *StatService) SetThresholdConnection(thdS rpcclient.ClientConnector) {
|
||||
sS.thdS = thdS
|
||||
}
|
||||
|
||||
@@ -153,7 +153,7 @@ func TestStatQueuesPopulateService(t *testing.T) {
|
||||
defaultCfg.StatSCfg().StringIndexedFields = nil
|
||||
defaultCfg.StatSCfg().PrefixIndexedFields = nil
|
||||
statService, err = NewStatService(dmSTS, defaultCfg,
|
||||
nil, &FilterS{dm: dmSTS, cfg: defaultCfg})
|
||||
&FilterS{dm: dmSTS, cfg: defaultCfg}, nil)
|
||||
if err != nil {
|
||||
t.Errorf("Error: %+v", err)
|
||||
}
|
||||
|
||||
@@ -69,8 +69,8 @@ func TestCdrsReload(t *testing.T) {
|
||||
cfg.StorDbCfg().Type = utils.INTERNAL
|
||||
stordb := NewStorDBService(cfg)
|
||||
chrS := NewChargerService(cfg, db, chS, filterSChan, server, nil, nil)
|
||||
schS := NewSchedulerService(cfg, db, chS, filterSChan, server, make(chan rpcclient.ClientConnector, 1), nil)
|
||||
tS := NewThresholdService(cfg, db, chS, filterSChan, server)
|
||||
schS := NewSchedulerService(cfg, db, chS, filterSChan, server, make(chan rpcclient.RpcClientConnection, 1), nil)
|
||||
tS := NewThresholdService(cfg, db, chS, filterSChan, server, make(chan rpcclient.RpcClientConnection, 1))
|
||||
ralS := NewRalService(cfg, db, stordb, chS, filterSChan, server,
|
||||
tS.GetIntenternalChan(), internalChan, cacheSChan, internalChan, internalChan,
|
||||
internalChan, schS, engineShutdown)
|
||||
|
||||
@@ -67,8 +67,8 @@ func TestRalsReload(t *testing.T) {
|
||||
db := NewDataDBService(cfg)
|
||||
cfg.StorDbCfg().Type = utils.INTERNAL
|
||||
stordb := NewStorDBService(cfg)
|
||||
schS := NewSchedulerService(cfg, db, chS, filterSChan, server, make(chan rpcclient.ClientConnector, 1), nil)
|
||||
tS := NewThresholdService(cfg, db, chS, filterSChan, server)
|
||||
schS := NewSchedulerService(cfg, db, chS, filterSChan, server, make(chan rpcclient.RpcClientConnection, 1), nil)
|
||||
tS := NewThresholdService(cfg, db, chS, filterSChan, server, make(chan rpcclient.RpcClientConnection, 1))
|
||||
ralS := NewRalService(cfg, db, stordb, chS, filterSChan, server,
|
||||
tS.GetIntenternalChan(), internalChan, cacheSChan, internalChan, internalChan,
|
||||
internalChan, schS, engineShutdown)
|
||||
|
||||
@@ -24,6 +24,8 @@ import (
|
||||
"testing"
|
||||
"time"
|
||||
|
||||
"github.com/cgrates/rpcclient"
|
||||
|
||||
"github.com/cgrates/cgrates/config"
|
||||
"github.com/cgrates/cgrates/engine"
|
||||
"github.com/cgrates/cgrates/servmanager"
|
||||
@@ -52,7 +54,7 @@ func TestResourceSReload(t *testing.T) {
|
||||
server := utils.NewServer()
|
||||
srvMngr := servmanager.NewServiceManager(cfg, engineShutdown)
|
||||
db := NewDataDBService(cfg)
|
||||
tS := NewThresholdService(cfg, db, chS, filterSChan, server)
|
||||
tS := NewThresholdService(cfg, db, chS, filterSChan, server, make(chan rpcclient.RpcClientConnection, 1))
|
||||
reS := NewResourceService(cfg, db, chS, filterSChan, server, tS.GetIntenternalChan(), nil)
|
||||
srvMngr.AddServices(NewConnManagerService(cfg, nil), tS, reS, NewLoaderService(cfg, db, filterSChan, server, nil, nil, engineShutdown), db)
|
||||
if err = srvMngr.StartServices(); err != nil {
|
||||
|
||||
@@ -33,30 +33,27 @@ import (
|
||||
// NewStatService returns the Stat Service
|
||||
func NewStatService(cfg *config.CGRConfig, dm *DataDBService,
|
||||
cacheS *engine.CacheS, filterSChan chan *engine.FilterS,
|
||||
server *utils.Server, thrsChan,
|
||||
dispatcherChan chan rpcclient.ClientConnector) servmanager.Service {
|
||||
server *utils.Server, internalStatSChan chan rpcclient.RpcClientConnection, connMgr *engine.ConnManager) servmanager.Service {
|
||||
return &StatService{
|
||||
connChan: make(chan rpcclient.ClientConnector, 1),
|
||||
cfg: cfg,
|
||||
dm: dm,
|
||||
cacheS: cacheS,
|
||||
filterSChan: filterSChan,
|
||||
server: server,
|
||||
thrsChan: thrsChan,
|
||||
dispatcherChan: dispatcherChan,
|
||||
connChan: internalStatSChan,
|
||||
cfg: cfg,
|
||||
dm: dm,
|
||||
cacheS: cacheS,
|
||||
filterSChan: filterSChan,
|
||||
server: server,
|
||||
connMgr: connMgr,
|
||||
}
|
||||
}
|
||||
|
||||
// StatService implements Service interface
|
||||
type StatService struct {
|
||||
sync.RWMutex
|
||||
cfg *config.CGRConfig
|
||||
dm *DataDBService
|
||||
cacheS *engine.CacheS
|
||||
filterSChan chan *engine.FilterS
|
||||
server *utils.Server
|
||||
thrsChan chan rpcclient.ClientConnector
|
||||
dispatcherChan chan rpcclient.ClientConnector
|
||||
cfg *config.CGRConfig
|
||||
dm *DataDBService
|
||||
cacheS *engine.CacheS
|
||||
filterSChan chan *engine.FilterS
|
||||
server *utils.Server
|
||||
connMgr *engine.ConnManager
|
||||
|
||||
sts *engine.StatService
|
||||
rpc *v1.StatSv1
|
||||
@@ -76,14 +73,9 @@ func (sts *StatService) Start() (err error) {
|
||||
filterS := <-sts.filterSChan
|
||||
sts.filterSChan <- filterS
|
||||
|
||||
var thdSConn rpcclient.ClientConnector
|
||||
if thdSConn, err = NewConnection(sts.cfg, sts.thrsChan, sts.dispatcherChan, sts.cfg.StatSCfg().ThresholdSConns); err != nil {
|
||||
utils.Logger.Crit(fmt.Sprintf("<%s> Could not connect to ThresholdS: %s", utils.StatS, err.Error()))
|
||||
return
|
||||
}
|
||||
sts.Lock()
|
||||
defer sts.Unlock()
|
||||
sts.sts, err = engine.NewStatService(sts.dm.GetDM(), sts.cfg, thdSConn, filterS)
|
||||
sts.sts, err = engine.NewStatService(sts.dm.GetDM(), sts.cfg, filterS, sts.connMgr)
|
||||
if err != nil {
|
||||
utils.Logger.Crit(fmt.Sprintf("<StatS> Could not init, error: %s", err.Error()))
|
||||
return
|
||||
@@ -105,13 +97,7 @@ func (sts *StatService) GetIntenternalChan() (conn chan rpcclient.ClientConnecto
|
||||
|
||||
// Reload handles the change of config
|
||||
func (sts *StatService) Reload() (err error) {
|
||||
var thdSConn rpcclient.ClientConnector
|
||||
if thdSConn, err = NewConnection(sts.cfg, sts.thrsChan, sts.dispatcherChan, sts.cfg.StatSCfg().ThresholdSConns); err != nil {
|
||||
utils.Logger.Crit(fmt.Sprintf("<%s> Could not connect to ThresholdS: %s", utils.StatS, err.Error()))
|
||||
return
|
||||
}
|
||||
sts.Lock()
|
||||
sts.sts.SetThresholdConnection(thdSConn)
|
||||
sts.sts.Reload()
|
||||
sts.Unlock()
|
||||
return
|
||||
|
||||
@@ -24,6 +24,8 @@ import (
|
||||
"testing"
|
||||
"time"
|
||||
|
||||
"github.com/cgrates/rpcclient"
|
||||
|
||||
"github.com/cgrates/cgrates/config"
|
||||
"github.com/cgrates/cgrates/engine"
|
||||
"github.com/cgrates/cgrates/servmanager"
|
||||
@@ -52,8 +54,8 @@ func TestStatSReload(t *testing.T) {
|
||||
server := utils.NewServer()
|
||||
srvMngr := servmanager.NewServiceManager(cfg, engineShutdown)
|
||||
db := NewDataDBService(cfg)
|
||||
tS := NewThresholdService(cfg, db, chS, filterSChan, server)
|
||||
sS := NewStatService(cfg, db, chS, filterSChan, server, tS.GetIntenternalChan(), nil)
|
||||
tS := NewThresholdService(cfg, db, chS, filterSChan, server, make(chan rpcclient.RpcClientConnection, 1))
|
||||
sS := NewStatService(cfg, db, chS, filterSChan, server, make(chan rpcclient.RpcClientConnection, 1), nil)
|
||||
srvMngr.AddServices(NewConnManagerService(cfg, nil), tS, sS, NewLoaderService(cfg, db, filterSChan, server, nil, nil, engineShutdown), db)
|
||||
if err = srvMngr.StartServices(); err != nil {
|
||||
t.Error(err)
|
||||
|
||||
@@ -24,6 +24,8 @@ import (
|
||||
"testing"
|
||||
"time"
|
||||
|
||||
"github.com/cgrates/rpcclient"
|
||||
|
||||
"github.com/cgrates/cgrates/config"
|
||||
"github.com/cgrates/cgrates/engine"
|
||||
"github.com/cgrates/cgrates/servmanager"
|
||||
@@ -50,7 +52,7 @@ func TestSupplierSReload(t *testing.T) {
|
||||
server := utils.NewServer()
|
||||
srvMngr := servmanager.NewServiceManager(cfg, engineShutdown)
|
||||
db := NewDataDBService(cfg)
|
||||
sts := NewStatService(cfg, db, chS, filterSChan, server, nil, nil)
|
||||
sts := NewStatService(cfg, db, chS, filterSChan, server, make(chan rpcclient.RpcClientConnection, 1), nil)
|
||||
supS := NewSupplierService(cfg, db, chS, filterSChan, server, nil, sts.GetIntenternalChan(), nil, nil)
|
||||
srvMngr.AddServices(NewConnManagerService(cfg, nil), supS, sts, NewLoaderService(cfg, db, filterSChan, server, nil, nil, engineShutdown), db)
|
||||
if err = srvMngr.StartServices(); err != nil {
|
||||
|
||||
@@ -33,9 +33,9 @@ import (
|
||||
// NewThresholdService returns the Threshold Service
|
||||
func NewThresholdService(cfg *config.CGRConfig, dm *DataDBService,
|
||||
cacheS *engine.CacheS, filterSChan chan *engine.FilterS,
|
||||
server *utils.Server) servmanager.Service {
|
||||
server *utils.Server, internalThresholdSChan chan rpcclient.RpcClientConnection) servmanager.Service {
|
||||
return &ThresholdService{
|
||||
connChan: make(chan rpcclient.ClientConnector, 1),
|
||||
connChan: internalThresholdSChan,
|
||||
cfg: cfg,
|
||||
dm: dm,
|
||||
cacheS: cacheS,
|
||||
|
||||
@@ -28,6 +28,7 @@ import (
|
||||
"github.com/cgrates/cgrates/engine"
|
||||
"github.com/cgrates/cgrates/servmanager"
|
||||
"github.com/cgrates/cgrates/utils"
|
||||
"github.com/cgrates/rpcclient"
|
||||
)
|
||||
|
||||
func TestThresholdSReload(t *testing.T) {
|
||||
@@ -48,7 +49,7 @@ func TestThresholdSReload(t *testing.T) {
|
||||
server := utils.NewServer()
|
||||
srvMngr := servmanager.NewServiceManager(cfg, engineShutdown)
|
||||
db := NewDataDBService(cfg)
|
||||
tS := NewThresholdService(cfg, db, chS, filterSChan, server)
|
||||
tS := NewThresholdService(cfg, db, chS, filterSChan, server, make(chan rpcclient.RpcClientConnection, 1))
|
||||
srvMngr.AddServices(NewConnManagerService(cfg, nil), tS, NewLoaderService(cfg, db, filterSChan, server, nil, nil, engineShutdown), db)
|
||||
if err = srvMngr.StartServices(); err != nil {
|
||||
t.Error(err)
|
||||
|
||||
Reference in New Issue
Block a user