From 27789c4d368ff8f59ad6528cd2754b88a5be99df Mon Sep 17 00:00:00 2001 From: TeoV Date: Thu, 5 Dec 2019 16:54:29 +0200 Subject: [PATCH] Add connection from StatS to Threshold through ConnManager --- cmd/cgr-engine/cgr-engine.go | 18 ++++---- config/config_it_test.go | 4 +- config/config_json_test.go | 2 +- config/config_test.go | 2 +- config/configsanity.go | 11 +++-- config/configsanity_test.go | 16 +++---- config/libconfig_json.go | 2 +- config/statscfg.go | 14 +++--- config/statscfg_test.go | 2 +- .../samples/acc_balance_keep/cgrates.json | 4 +- .../samples/acc_balance_keep_gob/cgrates.json | 4 +- data/conf/samples/cdrewithfilter/cgrates.json | 4 +- data/conf/samples/cdrsv2internal/cgrates.json | 4 +- data/conf/samples/cdrsv2mongo/cgrates.json | 4 +- .../conf/samples/cdrsv2mongo_gob/cgrates.json | 4 +- .../conf/samples/cdrsv2mysql/cdrsv2mysql.json | 4 +- .../samples/cdrsv2mysql_gob/cdrsv2mysql.json | 4 +- data/conf/samples/cdrsv2psql/cdrsv2psql.json | 4 +- data/conf/samples/dbinternal/cgrates.json | 4 +- .../samples/loaders/tutmongo/cgrates.json | 4 +- .../samples/loaders/tutmysql/cgrates.json | 4 +- data/conf/samples/mongoatlas/cgrates.json | 4 +- data/conf/samples/mongoreplica/cgrates.json | 4 +- data/conf/samples/sessions/cgrates.json | 4 +- data/conf/samples/tls/cgrates.json | 4 +- data/conf/samples/tls_gob/cgrates.json | 12 +++-- data/conf/samples/tutinternal/cgrates.json | 4 +- data/conf/samples/tutmongo/cgrates.json | 4 +- data/conf/samples/tutmongo2/cgrates.json | 4 +- data/conf/samples/tutmongo2_gob/cgrates.json | 2 +- data/conf/samples/tutmongo_gob/cgrates.json | 4 +- data/conf/samples/tutmongonew/cgrates.json | 4 +- data/conf/samples/tutmysql/cgrates.json | 4 +- data/conf/samples/tutmysql2/cgrates.json | 4 +- data/conf/samples/tutmysql2_gob/cgrates.json | 6 +-- .../samples/tutmysql_internal/cgrates.json | 4 +- data/conf/samples/tutpostgres/cgrates.json | 4 +- .../osips/cgrates/etc/cgrates/cgrates.json | 4 +- .../osips/cgrates/etc/cgrates/cgrates.json | 4 +- engine/stats.go | 22 +++------- engine/stats_test.go | 2 +- services/cdrs_it_test.go | 4 +- services/rals_it_test.go | 4 +- services/resources_it_test.go | 4 +- services/stats.go | 44 +++++++------------ services/stats_it_test.go | 6 ++- services/suppliers_it_test.go | 4 +- services/thresholds.go | 4 +- services/thresholds_it_test.go | 3 +- 49 files changed, 118 insertions(+), 178 deletions(-) diff --git a/cmd/cgr-engine/cgr-engine.go b/cmd/cgr-engine/cgr-engine.go index 0a3a16963..14eeb7b60 100644 --- a/cmd/cgr-engine/cgr-engine.go +++ b/cmd/cgr-engine/cgr-engine.go @@ -494,6 +494,8 @@ func main() { internalDispatcherSChan := make(chan rpcclient.RpcClientConnection, 1) // needed to avod cyclic dependency internalSessionSChan := make(chan rpcclient.RpcClientConnection, 1) // needed to avod cyclic dependency internalChargerSChan := make(chan rpcclient.RpcClientConnection, 1) // needed to avod cyclic dependency + internalThresholdSChan := make(chan rpcclient.RpcClientConnection, 1) // needed to avod cyclic dependency + internalStatSChan := make(chan rpcclient.RpcClientConnection, 1) // needed to avod cyclic dependency // init CacheS cacheS := initCacheS(internalCacheSChan, server, dmService.GetDM(), exitChan) @@ -511,7 +513,7 @@ func main() { //utils.ApierV1: rals.GetAPIv1().GetIntenternalChan(), //utils.ApierV2: rals.GetAPIv2().GetIntenternalChan(), utils.ConcatenatedKey(utils.MetaInternal, utils.MetaAttributes): internalAttributeSChan, - utils.CacheSv1: internalCacheSChan, + utils.ConcatenatedKey(utils.MetaInternal, utils.MetaCaches): internalCacheSChan, //utils.CDRsV1: cdrS.GetIntenternalChan(), //utils.CDRsV2: cdrS.GetIntenternalChan(), utils.ConcatenatedKey(utils.MetaInternal, utils.MetaChargers): internalChargerSChan, @@ -521,12 +523,12 @@ func main() { //utils.Responder: rals.GetResponder().GetIntenternalChan(), //utils.SchedulerSv1: schS.GetIntenternalChan(), utils.ConcatenatedKey(utils.MetaInternal, utils.MetaSessionS): internalSessionSChan, - //utils.StatSv1: stS.GetIntenternalChan(), + utils.ConcatenatedKey(utils.MetaInternal, utils.MetaStatS): internalStatSChan, //utils.SupplierSv1: supS.GetIntenternalChan(), - //utils.ThresholdSv1: tS.GetIntenternalChan(), - utils.ServiceManagerV1: internalServeManagerChan, - utils.ConfigSv1: internalConfigChan, - utils.CoreSv1: internalCoreSv1Chan, + utils.ConcatenatedKey(utils.MetaInternal, utils.MetaThresholds): internalThresholdSChan, + utils.ConcatenatedKey(utils.MetaInternal, utils.MetaServiceManager): internalServeManagerChan, + utils.ConcatenatedKey(utils.MetaInternal, utils.MetaConfig): internalConfigChan, + utils.ConcatenatedKey(utils.MetaInternal, utils.MetaCore): internalCoreSv1Chan, //utils.RALsV1: rals.GetIntenternalChan(), }) @@ -534,9 +536,9 @@ func main() { dspS := services.NewDispatcherService(cfg, dmService, cacheS, filterSChan, server, internalAttributeSChan, internalDispatcherSChan) chrS := services.NewChargerService(cfg, dmService, cacheS, filterSChan, server, internalChargerSChan, connManager.GetConnMgr()) - tS := services.NewThresholdService(cfg, dmService, cacheS, filterSChan, server) + tS := services.NewThresholdService(cfg, dmService, cacheS, filterSChan, server, internalThresholdSChan) stS := services.NewStatService(cfg, dmService, cacheS, filterSChan, server, - tS.GetIntenternalChan(), dspS.GetIntenternalChan()) + internalStatSChan, connManager.GetConnMgr()) reS := services.NewResourceService(cfg, dmService, cacheS, filterSChan, server, tS.GetIntenternalChan(), dspS.GetIntenternalChan()) supS := services.NewSupplierService(cfg, dmService, cacheS, filterSChan, server, diff --git a/config/config_it_test.go b/config/config_it_test.go index 2c70c94da..81ea6b964 100644 --- a/config/config_it_test.go +++ b/config/config_it_test.go @@ -177,9 +177,7 @@ func TestCGRConfigReloadStatS(t *testing.T) { StringIndexedFields: &[]string{utils.Account}, PrefixIndexedFields: &[]string{}, IndexedSelects: true, - ThresholdSConns: []*RemoteHost{ - &RemoteHost{Address: "127.0.0.1:2012", Transport: utils.MetaJSON}, - }, + ThresholdSConns: []string{utils.MetaLocalHost}, } if !reflect.DeepEqual(expAttr, cfg.StatSCfg()) { t.Errorf("Expected %s , received: %s ", utils.ToJSON(expAttr), utils.ToJSON(cfg.StatSCfg())) diff --git a/config/config_json_test.go b/config/config_json_test.go index 2131fdb34..c1ed9e7c2 100755 --- a/config/config_json_test.go +++ b/config/config_json_test.go @@ -909,7 +909,7 @@ func TestDfStatServiceJsonCfg(t *testing.T) { Indexed_selects: utils.BoolPointer(true), Store_interval: utils.StringPointer(""), Store_uncompressed_limit: utils.IntPointer(0), - Thresholds_conns: &[]*RemoteHostJson{}, + Thresholds_conns: &[]string{}, String_indexed_fields: nil, Prefix_indexed_fields: &[]string{}, } diff --git a/config/config_test.go b/config/config_test.go index 321d2bc0a..15c2a41cd 100755 --- a/config/config_test.go +++ b/config/config_test.go @@ -845,7 +845,7 @@ func TestCgrCfgJSONDefaultStatsCfg(t *testing.T) { Enabled: false, IndexedSelects: true, StoreInterval: 0, - ThresholdSConns: []*RemoteHost{}, + ThresholdSConns: []string{}, StringIndexedFields: nil, PrefixIndexedFields: &[]string{}, } diff --git a/config/configsanity.go b/config/configsanity.go index fdf923d9c..e6f7bdb52 100644 --- a/config/configsanity.go +++ b/config/configsanity.go @@ -335,10 +335,13 @@ func (cfg *CGRConfig) checkConfigSanity() error { } } // StatS checks - if cfg.statsCfg.Enabled && !cfg.thresholdSCfg.Enabled && !cfg.dispatcherSCfg.Enabled { - for _, connCfg := range cfg.statsCfg.ThresholdSConns { - if connCfg.Address == utils.MetaInternal { - return fmt.Errorf("<%s> not enabled but requested by <%s> component.", utils.ThresholdS, utils.StatService) + if cfg.statsCfg.Enabled { + for _, connID := range cfg.statsCfg.ThresholdSConns { + if strings.HasPrefix(connID, utils.MetaInternal) && !cfg.thresholdSCfg.Enabled { + return fmt.Errorf("<%s> not enabled but requested by <%s> component.", utils.ThresholdS, utils.StatS) + } + if _, has := cfg.rpcConns[connID]; !has && !strings.HasPrefix(connID, utils.MetaInternal) { + return fmt.Errorf("<%s> Connection with id: <%s> not defined", utils.StatS, connID) } } } diff --git a/config/configsanity_test.go b/config/configsanity_test.go index 14ddb6629..09c3fddab 100644 --- a/config/configsanity_test.go +++ b/config/configsanity_test.go @@ -544,14 +544,10 @@ func TestConfigSanityResourceLimiter(t *testing.T) { func TestConfigSanityStatS(t *testing.T) { cfg, _ = NewDefaultCGRConfig() cfg.statsCfg = &StatSCfg{ - Enabled: true, - ThresholdSConns: []*RemoteHost{ - &RemoteHost{ - Address: utils.MetaInternal, - }, - }, + Enabled: true, + ThresholdSConns: []string{utils.MetaInternal}, } - expected := " not enabled but requested by component." + expected := " not enabled but requested by component." if err := cfg.checkConfigSanity(); err == nil || err.Error() != expected { t.Errorf("Expecting: %+q received: %+q", expected, err) } @@ -622,7 +618,11 @@ func TestConfigSanityEventReader(t *testing.T) { t.Errorf("Expecting: %+q received: %+q", expected, err) } cfg.ersCfg.SessionSConns = []string{utils.MetaInternal} - + expected = " not enabled but requested by component." + if err := cfg.checkConfigSanity(); err == nil || err.Error() != expected { + t.Errorf("Expecting: %+q received: %+q", expected, err) + } + cfg.sessionSCfg.Enabled = true cfg.ersCfg.Readers = []*EventReaderCfg{ &EventReaderCfg{ ID: "test", diff --git a/config/libconfig_json.go b/config/libconfig_json.go index 79b3bb588..b8836f62c 100755 --- a/config/libconfig_json.go +++ b/config/libconfig_json.go @@ -426,7 +426,7 @@ type StatServJsonCfg struct { Indexed_selects *bool Store_interval *string Store_uncompressed_limit *int - Thresholds_conns *[]*RemoteHostJson + Thresholds_conns *[]string String_indexed_fields *[]string Prefix_indexed_fields *[]string } diff --git a/config/statscfg.go b/config/statscfg.go index 2ca5c7b13..2aafb115c 100644 --- a/config/statscfg.go +++ b/config/statscfg.go @@ -29,7 +29,7 @@ type StatSCfg struct { IndexedSelects bool StoreInterval time.Duration // Dump regularly from cache into dataDB StoreUncompressedLimit int - ThresholdSConns []*RemoteHost + ThresholdSConns []string StringIndexedFields *[]string PrefixIndexedFields *[]string } @@ -53,10 +53,14 @@ func (st *StatSCfg) loadFromJsonCfg(jsnCfg *StatServJsonCfg) (err error) { st.StoreUncompressedLimit = *jsnCfg.Store_uncompressed_limit } if jsnCfg.Thresholds_conns != nil { - st.ThresholdSConns = make([]*RemoteHost, len(*jsnCfg.Thresholds_conns)) - for idx, jsnHaCfg := range *jsnCfg.Thresholds_conns { - st.ThresholdSConns[idx] = NewDfltRemoteHost() - st.ThresholdSConns[idx].loadFromJsonCfg(jsnHaCfg) + st.ThresholdSConns = make([]string, len(*jsnCfg.Thresholds_conns)) + for idx, conn := range *jsnCfg.Thresholds_conns { + // if we have the connection internal we change the name so we can have internal rpc for each subsystem + if conn == utils.MetaInternal { + st.ThresholdSConns[idx] = utils.ConcatenatedKey(utils.MetaInternal, utils.MetaThresholds) + } else { + st.ThresholdSConns[idx] = conn + } } } if jsnCfg.String_indexed_fields != nil { diff --git a/config/statscfg_test.go b/config/statscfg_test.go index b7fbcf1af..ca487c4d7 100644 --- a/config/statscfg_test.go +++ b/config/statscfg_test.go @@ -46,7 +46,7 @@ func TestStatSCfgloadFromJsonCfg(t *testing.T) { }` expected = StatSCfg{ StoreInterval: time.Duration(time.Second * 2), - ThresholdSConns: []*RemoteHost{}, + ThresholdSConns: []string{}, PrefixIndexedFields: &[]string{"index1", "index2"}, } if jsnCfg, err := NewCgrJsonCfgFromBytes([]byte(cfgJSONStr)); err != nil { diff --git a/data/conf/samples/acc_balance_keep/cgrates.json b/data/conf/samples/acc_balance_keep/cgrates.json index 76256c201..86e2f1294 100644 --- a/data/conf/samples/acc_balance_keep/cgrates.json +++ b/data/conf/samples/acc_balance_keep/cgrates.json @@ -92,9 +92,7 @@ "stats": { "enabled": true, "store_interval": "1s", - "thresholds_conns": [ - {"address": "*internal"} - ], + "thresholds_conns": ["*internal"], }, diff --git a/data/conf/samples/acc_balance_keep_gob/cgrates.json b/data/conf/samples/acc_balance_keep_gob/cgrates.json index 89b2ccd1d..97e25f6c8 100644 --- a/data/conf/samples/acc_balance_keep_gob/cgrates.json +++ b/data/conf/samples/acc_balance_keep_gob/cgrates.json @@ -92,9 +92,7 @@ "stats": { "enabled": true, "store_interval": "1s", - "thresholds_conns": [ - {"address": "*internal"} - ], + "thresholds_conns": ["*internal"], }, diff --git a/data/conf/samples/cdrewithfilter/cgrates.json b/data/conf/samples/cdrewithfilter/cgrates.json index 6cd9700b4..803577668 100755 --- a/data/conf/samples/cdrewithfilter/cgrates.json +++ b/data/conf/samples/cdrewithfilter/cgrates.json @@ -94,9 +94,7 @@ "stats": { "enabled": true, "store_interval": "1s", - "thresholds_conns": [ - {"address": "*internal"} - ], + "thresholds_conns": ["*internal"], }, "thresholds": { diff --git a/data/conf/samples/cdrsv2internal/cgrates.json b/data/conf/samples/cdrsv2internal/cgrates.json index 23acbf23c..97c2cb68d 100644 --- a/data/conf/samples/cdrsv2internal/cgrates.json +++ b/data/conf/samples/cdrsv2internal/cgrates.json @@ -50,9 +50,7 @@ "stats": { "enabled": true, "store_interval": "1s", - "thresholds_conns": [ - {"address": "*internal"} - ], + "thresholds_conns": ["*internal"], }, "thresholds": { diff --git a/data/conf/samples/cdrsv2mongo/cgrates.json b/data/conf/samples/cdrsv2mongo/cgrates.json index 5a7cb9608..33ecb284e 100644 --- a/data/conf/samples/cdrsv2mongo/cgrates.json +++ b/data/conf/samples/cdrsv2mongo/cgrates.json @@ -51,9 +51,7 @@ "stats": { "enabled": true, "store_interval": "1s", - "thresholds_conns": [ - {"address": "*internal"} - ], + "thresholds_conns": ["*internal"], }, "thresholds": { diff --git a/data/conf/samples/cdrsv2mongo_gob/cgrates.json b/data/conf/samples/cdrsv2mongo_gob/cgrates.json index 267c27919..58978178c 100644 --- a/data/conf/samples/cdrsv2mongo_gob/cgrates.json +++ b/data/conf/samples/cdrsv2mongo_gob/cgrates.json @@ -51,9 +51,7 @@ "stats": { "enabled": true, "store_interval": "1s", - "thresholds_conns": [ - {"address": "*internal"} - ], + "thresholds_conns": ["*internal"], }, "thresholds": { diff --git a/data/conf/samples/cdrsv2mysql/cdrsv2mysql.json b/data/conf/samples/cdrsv2mysql/cdrsv2mysql.json index 1d5889995..0471d82af 100644 --- a/data/conf/samples/cdrsv2mysql/cdrsv2mysql.json +++ b/data/conf/samples/cdrsv2mysql/cdrsv2mysql.json @@ -51,9 +51,7 @@ "stats": { "enabled": true, "store_interval": "1s", - "thresholds_conns": [ - {"address": "*internal"} - ], + "thresholds_conns": ["*internal"], }, "thresholds": { diff --git a/data/conf/samples/cdrsv2mysql_gob/cdrsv2mysql.json b/data/conf/samples/cdrsv2mysql_gob/cdrsv2mysql.json index cd9a36cf5..028173859 100644 --- a/data/conf/samples/cdrsv2mysql_gob/cdrsv2mysql.json +++ b/data/conf/samples/cdrsv2mysql_gob/cdrsv2mysql.json @@ -51,9 +51,7 @@ "stats": { "enabled": true, "store_interval": "1s", - "thresholds_conns": [ - {"address": "*internal"} - ], + "thresholds_conns": ["*internal"], }, "thresholds": { diff --git a/data/conf/samples/cdrsv2psql/cdrsv2psql.json b/data/conf/samples/cdrsv2psql/cdrsv2psql.json index 274518ffd..640222717 100644 --- a/data/conf/samples/cdrsv2psql/cdrsv2psql.json +++ b/data/conf/samples/cdrsv2psql/cdrsv2psql.json @@ -51,9 +51,7 @@ "stats": { "enabled": true, "store_interval": "1s", - "thresholds_conns": [ - {"address": "*internal"} - ], + "thresholds_conns": ["*internal"], }, "thresholds": { diff --git a/data/conf/samples/dbinternal/cgrates.json b/data/conf/samples/dbinternal/cgrates.json index 67bb7674e..21a923cf0 100755 --- a/data/conf/samples/dbinternal/cgrates.json +++ b/data/conf/samples/dbinternal/cgrates.json @@ -54,9 +54,7 @@ "stats": { "enabled": true, "store_interval": "1s", - "thresholds_conns": [ - {"address": "*internal"} - ], + "thresholds_conns": ["*internal"], }, diff --git a/data/conf/samples/loaders/tutmongo/cgrates.json b/data/conf/samples/loaders/tutmongo/cgrates.json index 737264b60..a6603bb79 100644 --- a/data/conf/samples/loaders/tutmongo/cgrates.json +++ b/data/conf/samples/loaders/tutmongo/cgrates.json @@ -88,9 +88,7 @@ "stats": { "enabled": true, "store_interval": "1s", - "thresholds_conns": [ - {"address": "*internal"} - ], + "thresholds_conns": ["*internal"], }, diff --git a/data/conf/samples/loaders/tutmysql/cgrates.json b/data/conf/samples/loaders/tutmysql/cgrates.json index bfeb9c683..016246b6e 100644 --- a/data/conf/samples/loaders/tutmysql/cgrates.json +++ b/data/conf/samples/loaders/tutmysql/cgrates.json @@ -131,9 +131,7 @@ "stats": { "enabled": true, "store_interval": "1s", - "thresholds_conns": [ - {"address": "*internal"} - ], + "thresholds_conns": ["*internal"], }, "thresholds": { diff --git a/data/conf/samples/mongoatlas/cgrates.json b/data/conf/samples/mongoatlas/cgrates.json index c640a2538..0be387709 100755 --- a/data/conf/samples/mongoatlas/cgrates.json +++ b/data/conf/samples/mongoatlas/cgrates.json @@ -113,9 +113,7 @@ "stats": { "enabled": true, "store_interval": "1s", - "thresholds_conns": [ - {"address": "*internal"} - ], + "thresholds_conns": ["*internal"], }, diff --git a/data/conf/samples/mongoreplica/cgrates.json b/data/conf/samples/mongoreplica/cgrates.json index 5789a6ea5..f2eb2e2f8 100755 --- a/data/conf/samples/mongoreplica/cgrates.json +++ b/data/conf/samples/mongoreplica/cgrates.json @@ -85,9 +85,7 @@ "stats": { "enabled": true, "store_interval": "1s", - "thresholds_conns": [ - {"address": "*internal"} - ], + "thresholds_conns": ["*internal"], }, diff --git a/data/conf/samples/sessions/cgrates.json b/data/conf/samples/sessions/cgrates.json index fd55b77b1..9a7cd2a39 100644 --- a/data/conf/samples/sessions/cgrates.json +++ b/data/conf/samples/sessions/cgrates.json @@ -67,9 +67,7 @@ "stats": { "enabled": true, "store_interval": "1s", - "thresholds_conns": [ - {"address": "*internal"} - ], + "thresholds_conns": ["*internal"], }, diff --git a/data/conf/samples/tls/cgrates.json b/data/conf/samples/tls/cgrates.json index b920c9131..22de4360a 100755 --- a/data/conf/samples/tls/cgrates.json +++ b/data/conf/samples/tls/cgrates.json @@ -65,9 +65,7 @@ "stats": { "enabled": true, "store_interval": "1s", - "thresholds_conns": [ - {"address": "127.0.0.1:2012", "transport": "*json"} - ], + "thresholds_conns": ["*localhost"], }, "thresholds": { diff --git a/data/conf/samples/tls_gob/cgrates.json b/data/conf/samples/tls_gob/cgrates.json index 9a96e9d56..4dcd0cd11 100755 --- a/data/conf/samples/tls_gob/cgrates.json +++ b/data/conf/samples/tls_gob/cgrates.json @@ -53,6 +53,14 @@ }, +"rpc_conns": { + "conn1": { + "strategy": "first", + "conns": [{"address": "127.0.0.1:2013", "transport":"*gob"}], + }, +}, + + "resources": { "enabled": true, "store_interval": "1s", @@ -65,9 +73,7 @@ "stats": { "enabled": true, "store_interval": "1s", - "thresholds_conns": [ - {"address": "127.0.0.1:2013", "transport": "*gob"} - ], + "thresholds_conns": ["conn1"], }, "thresholds": { diff --git a/data/conf/samples/tutinternal/cgrates.json b/data/conf/samples/tutinternal/cgrates.json index cea07d8e6..1725dc8d0 100644 --- a/data/conf/samples/tutinternal/cgrates.json +++ b/data/conf/samples/tutinternal/cgrates.json @@ -74,9 +74,7 @@ "stats": { "enabled": true, "store_interval": "-1", - "thresholds_conns": [ - {"address": "*internal"} - ], + "thresholds_conns": ["*internal"], }, "thresholds": { diff --git a/data/conf/samples/tutmongo/cgrates.json b/data/conf/samples/tutmongo/cgrates.json index d24ee1731..8bc4d40db 100644 --- a/data/conf/samples/tutmongo/cgrates.json +++ b/data/conf/samples/tutmongo/cgrates.json @@ -89,9 +89,7 @@ "stats": { "enabled": true, "store_interval": "1s", - "thresholds_conns": [ - {"address": "*internal"} - ], + "thresholds_conns": ["*internal"], }, diff --git a/data/conf/samples/tutmongo2/cgrates.json b/data/conf/samples/tutmongo2/cgrates.json index ae3547dbf..179c9e143 100644 --- a/data/conf/samples/tutmongo2/cgrates.json +++ b/data/conf/samples/tutmongo2/cgrates.json @@ -150,9 +150,7 @@ "stats": { "enabled": true, - "thresholds_conns": [ - {"address": "127.0.0.1:2012", "transport":"*json"} - ], + "thresholds_conns": ["*localhost"], "string_indexed_fields": ["Account"] }, diff --git a/data/conf/samples/tutmongo2_gob/cgrates.json b/data/conf/samples/tutmongo2_gob/cgrates.json index 8fd5876a9..5fcbcc4e5 100644 --- a/data/conf/samples/tutmongo2_gob/cgrates.json +++ b/data/conf/samples/tutmongo2_gob/cgrates.json @@ -11,7 +11,7 @@ "rpc_conns": { "conn1": { "strategy": "first", - "conns": [{{"address": "127.0.0.1:2013", "transport":"*gob"},}], + "conns": [{"address": "127.0.0.1:2013", "transport":"*gob"}], }, }, diff --git a/data/conf/samples/tutmongo_gob/cgrates.json b/data/conf/samples/tutmongo_gob/cgrates.json index 0f31263fa..db6190bff 100644 --- a/data/conf/samples/tutmongo_gob/cgrates.json +++ b/data/conf/samples/tutmongo_gob/cgrates.json @@ -89,9 +89,7 @@ "stats": { "enabled": true, "store_interval": "1s", - "thresholds_conns": [ - {"address": "*internal"} - ], + "thresholds_conns": ["*internal"], }, diff --git a/data/conf/samples/tutmongonew/cgrates.json b/data/conf/samples/tutmongonew/cgrates.json index 4dccd3883..1848eb668 100644 --- a/data/conf/samples/tutmongonew/cgrates.json +++ b/data/conf/samples/tutmongonew/cgrates.json @@ -88,9 +88,7 @@ "stats": { "enabled": true, "store_interval": "1s", - "thresholds_conns": [ - {"address": "*internal"} - ], + "thresholds_conns": ["*internal"], }, diff --git a/data/conf/samples/tutmysql/cgrates.json b/data/conf/samples/tutmysql/cgrates.json index 802312978..851c95bc2 100644 --- a/data/conf/samples/tutmysql/cgrates.json +++ b/data/conf/samples/tutmysql/cgrates.json @@ -94,9 +94,7 @@ "stats": { "enabled": true, "store_interval": "1s", - "thresholds_conns": [ - {"address": "*internal"} - ], + "thresholds_conns": ["*internal"], }, "thresholds": { diff --git a/data/conf/samples/tutmysql2/cgrates.json b/data/conf/samples/tutmysql2/cgrates.json index 2e4bcb2d4..b8ac0f976 100644 --- a/data/conf/samples/tutmysql2/cgrates.json +++ b/data/conf/samples/tutmysql2/cgrates.json @@ -115,9 +115,7 @@ "stats": { "enabled": true, - "thresholds_conns": [ - {"address": "127.0.0.1:2012", "transport":"*json"} - ], + "thresholds_conns": ["*localhost"], "string_indexed_fields": ["Account"] }, diff --git a/data/conf/samples/tutmysql2_gob/cgrates.json b/data/conf/samples/tutmysql2_gob/cgrates.json index 221c6627a..c2960f24a 100644 --- a/data/conf/samples/tutmysql2_gob/cgrates.json +++ b/data/conf/samples/tutmysql2_gob/cgrates.json @@ -12,7 +12,7 @@ "rpc_conns": { "conn1": { "strategy": "first", - "conns": [{{"address": "127.0.0.1:2013", "transport":"*gob"},}], + "conns": [{"address": "127.0.0.1:2013", "transport":"*gob"}], }, }, @@ -123,9 +123,7 @@ "stats": { "enabled": true, - "thresholds_conns": [ - {"address": "127.0.0.1:2013", "transport":"*gob"} - ], + "thresholds_conns": ["conn1"], "string_indexed_fields": ["Account"] }, diff --git a/data/conf/samples/tutmysql_internal/cgrates.json b/data/conf/samples/tutmysql_internal/cgrates.json index ddd3813cf..2208599d0 100644 --- a/data/conf/samples/tutmysql_internal/cgrates.json +++ b/data/conf/samples/tutmysql_internal/cgrates.json @@ -213,9 +213,7 @@ "stats": { "enabled": true, "store_interval": "1s", - "thresholds_conns": [ - {"address": "*internal"} - ], + "thresholds_conns": ["*internal"], }, "thresholds": { diff --git a/data/conf/samples/tutpostgres/cgrates.json b/data/conf/samples/tutpostgres/cgrates.json index d624ca9b6..52d7325e4 100644 --- a/data/conf/samples/tutpostgres/cgrates.json +++ b/data/conf/samples/tutpostgres/cgrates.json @@ -62,9 +62,7 @@ "stats": { "enabled": true, "store_interval": "1s", - "thresholds_conns": [ - {"address": "*internal"} - ], + "thresholds_conns": ["*internal"], }, diff --git a/data/tutorial_tests/osips/cgrates/etc/cgrates/cgrates.json b/data/tutorial_tests/osips/cgrates/etc/cgrates/cgrates.json index 9698418d3..a3bb280ef 100644 --- a/data/tutorial_tests/osips/cgrates/etc/cgrates/cgrates.json +++ b/data/tutorial_tests/osips/cgrates/etc/cgrates/cgrates.json @@ -105,9 +105,7 @@ "stats": { "enabled": true, - "thresholds_conns": [ - {"address": "*internal"} - ], + "thresholds_conns": ["*internal"], "string_indexed_fields": ["Account"], }, diff --git a/data/tutorials/osips/cgrates/etc/cgrates/cgrates.json b/data/tutorials/osips/cgrates/etc/cgrates/cgrates.json index 067378892..831f58e67 100644 --- a/data/tutorials/osips/cgrates/etc/cgrates/cgrates.json +++ b/data/tutorials/osips/cgrates/etc/cgrates/cgrates.json @@ -105,9 +105,7 @@ "stats": { "enabled": true, - "thresholds_conns": [ - {"address": "*internal"} - ], + "thresholds_conns": ["*internal"], "string_indexed_fields": ["Account"], }, diff --git a/engine/stats.go b/engine/stats.go index 0189bb2bc..5c0867014 100644 --- a/engine/stats.go +++ b/engine/stats.go @@ -21,25 +21,21 @@ package engine import ( "fmt" "math/rand" - "reflect" "sync" "time" "github.com/cgrates/cgrates/config" "github.com/cgrates/cgrates/guardian" "github.com/cgrates/cgrates/utils" - "github.com/cgrates/rpcclient" ) // NewStatService initializes a StatService func NewStatService(dm *DataManager, cgrcfg *config.CGRConfig, - thdS rpcclient.ClientConnector, filterS *FilterS) (ss *StatService, err error) { - if thdS != nil && reflect.ValueOf(thdS).IsNil() { // fix nil value in interface - thdS = nil - } + filterS *FilterS, connMgr *ConnManager) (ss *StatService, err error) { + return &StatService{ dm: dm, - thdS: thdS, + connMgr: connMgr, filterS: filterS, cgrcfg: cgrcfg, storedStatQueues: make(utils.StringMap), @@ -50,7 +46,7 @@ func NewStatService(dm *DataManager, cgrcfg *config.CGRConfig, // StatService builds stats for events type StatService struct { dm *DataManager - thdS rpcclient.ClientConnector // rpc connection towards ThresholdS + connMgr *ConnManager filterS *FilterS cgrcfg *config.CGRConfig loopStoped chan struct{} @@ -270,7 +266,7 @@ func (sS *StatService) processEvent(args *StatsArgsProcessEvent) (statQueueIDs [ sS.ssqMux.Unlock() } } - if sS.thdS != nil { + if len(sS.cgrcfg.StatSCfg().ThresholdSConns) != 0 { var thIDs []string if len(sq.sqPrfl.ThresholdIDs) != 0 { if len(sq.sqPrfl.ThresholdIDs) == 1 && sq.sqPrfl.ThresholdIDs[0] == utils.META_NONE { @@ -294,7 +290,7 @@ func (sS *StatService) processEvent(args *StatsArgsProcessEvent) (statQueueIDs [ thEv.Event[metricID] = metric.GetValue() } var tIDs []string - if err := sS.thdS.Call(utils.ThresholdSv1ProcessEvent, thEv, &tIDs); err != nil && + if err := sS.connMgr.Call(sS.cgrcfg.StatSCfg().ThresholdSConns, utils.ThresholdSv1ProcessEvent, thEv, &tIDs); err != nil && err.Error() != utils.ErrNotFound.Error() { utils.Logger.Warning( fmt.Sprintf(" error: %s processing event %+v with ThresholdS.", err.Error(), thEv)) @@ -425,9 +421,3 @@ func (sS *StatService) Reload() { func (sS *StatService) StartLoop() { go sS.runBackup() } - -// SetThresholdConnection sets the new connection to the threshold service -// only used on reload -func (sS *StatService) SetThresholdConnection(thdS rpcclient.ClientConnector) { - sS.thdS = thdS -} diff --git a/engine/stats_test.go b/engine/stats_test.go index fc88c113a..1257f84d7 100644 --- a/engine/stats_test.go +++ b/engine/stats_test.go @@ -153,7 +153,7 @@ func TestStatQueuesPopulateService(t *testing.T) { defaultCfg.StatSCfg().StringIndexedFields = nil defaultCfg.StatSCfg().PrefixIndexedFields = nil statService, err = NewStatService(dmSTS, defaultCfg, - nil, &FilterS{dm: dmSTS, cfg: defaultCfg}) + &FilterS{dm: dmSTS, cfg: defaultCfg}, nil) if err != nil { t.Errorf("Error: %+v", err) } diff --git a/services/cdrs_it_test.go b/services/cdrs_it_test.go index 2dd3418a3..576eca4f2 100644 --- a/services/cdrs_it_test.go +++ b/services/cdrs_it_test.go @@ -69,8 +69,8 @@ func TestCdrsReload(t *testing.T) { cfg.StorDbCfg().Type = utils.INTERNAL stordb := NewStorDBService(cfg) chrS := NewChargerService(cfg, db, chS, filterSChan, server, nil, nil) - schS := NewSchedulerService(cfg, db, chS, filterSChan, server, make(chan rpcclient.ClientConnector, 1), nil) - tS := NewThresholdService(cfg, db, chS, filterSChan, server) + schS := NewSchedulerService(cfg, db, chS, filterSChan, server, make(chan rpcclient.RpcClientConnection, 1), nil) + tS := NewThresholdService(cfg, db, chS, filterSChan, server, make(chan rpcclient.RpcClientConnection, 1)) ralS := NewRalService(cfg, db, stordb, chS, filterSChan, server, tS.GetIntenternalChan(), internalChan, cacheSChan, internalChan, internalChan, internalChan, schS, engineShutdown) diff --git a/services/rals_it_test.go b/services/rals_it_test.go index ce3104ae4..ed1f2cf27 100644 --- a/services/rals_it_test.go +++ b/services/rals_it_test.go @@ -67,8 +67,8 @@ func TestRalsReload(t *testing.T) { db := NewDataDBService(cfg) cfg.StorDbCfg().Type = utils.INTERNAL stordb := NewStorDBService(cfg) - schS := NewSchedulerService(cfg, db, chS, filterSChan, server, make(chan rpcclient.ClientConnector, 1), nil) - tS := NewThresholdService(cfg, db, chS, filterSChan, server) + schS := NewSchedulerService(cfg, db, chS, filterSChan, server, make(chan rpcclient.RpcClientConnection, 1), nil) + tS := NewThresholdService(cfg, db, chS, filterSChan, server, make(chan rpcclient.RpcClientConnection, 1)) ralS := NewRalService(cfg, db, stordb, chS, filterSChan, server, tS.GetIntenternalChan(), internalChan, cacheSChan, internalChan, internalChan, internalChan, schS, engineShutdown) diff --git a/services/resources_it_test.go b/services/resources_it_test.go index 791dff249..0cfec78be 100644 --- a/services/resources_it_test.go +++ b/services/resources_it_test.go @@ -24,6 +24,8 @@ import ( "testing" "time" + "github.com/cgrates/rpcclient" + "github.com/cgrates/cgrates/config" "github.com/cgrates/cgrates/engine" "github.com/cgrates/cgrates/servmanager" @@ -52,7 +54,7 @@ func TestResourceSReload(t *testing.T) { server := utils.NewServer() srvMngr := servmanager.NewServiceManager(cfg, engineShutdown) db := NewDataDBService(cfg) - tS := NewThresholdService(cfg, db, chS, filterSChan, server) + tS := NewThresholdService(cfg, db, chS, filterSChan, server, make(chan rpcclient.RpcClientConnection, 1)) reS := NewResourceService(cfg, db, chS, filterSChan, server, tS.GetIntenternalChan(), nil) srvMngr.AddServices(NewConnManagerService(cfg, nil), tS, reS, NewLoaderService(cfg, db, filterSChan, server, nil, nil, engineShutdown), db) if err = srvMngr.StartServices(); err != nil { diff --git a/services/stats.go b/services/stats.go index 8e3eb39ed..cf96c0711 100644 --- a/services/stats.go +++ b/services/stats.go @@ -33,30 +33,27 @@ import ( // NewStatService returns the Stat Service func NewStatService(cfg *config.CGRConfig, dm *DataDBService, cacheS *engine.CacheS, filterSChan chan *engine.FilterS, - server *utils.Server, thrsChan, - dispatcherChan chan rpcclient.ClientConnector) servmanager.Service { + server *utils.Server, internalStatSChan chan rpcclient.RpcClientConnection, connMgr *engine.ConnManager) servmanager.Service { return &StatService{ - connChan: make(chan rpcclient.ClientConnector, 1), - cfg: cfg, - dm: dm, - cacheS: cacheS, - filterSChan: filterSChan, - server: server, - thrsChan: thrsChan, - dispatcherChan: dispatcherChan, + connChan: internalStatSChan, + cfg: cfg, + dm: dm, + cacheS: cacheS, + filterSChan: filterSChan, + server: server, + connMgr: connMgr, } } // StatService implements Service interface type StatService struct { sync.RWMutex - cfg *config.CGRConfig - dm *DataDBService - cacheS *engine.CacheS - filterSChan chan *engine.FilterS - server *utils.Server - thrsChan chan rpcclient.ClientConnector - dispatcherChan chan rpcclient.ClientConnector + cfg *config.CGRConfig + dm *DataDBService + cacheS *engine.CacheS + filterSChan chan *engine.FilterS + server *utils.Server + connMgr *engine.ConnManager sts *engine.StatService rpc *v1.StatSv1 @@ -76,14 +73,9 @@ func (sts *StatService) Start() (err error) { filterS := <-sts.filterSChan sts.filterSChan <- filterS - var thdSConn rpcclient.ClientConnector - if thdSConn, err = NewConnection(sts.cfg, sts.thrsChan, sts.dispatcherChan, sts.cfg.StatSCfg().ThresholdSConns); err != nil { - utils.Logger.Crit(fmt.Sprintf("<%s> Could not connect to ThresholdS: %s", utils.StatS, err.Error())) - return - } sts.Lock() defer sts.Unlock() - sts.sts, err = engine.NewStatService(sts.dm.GetDM(), sts.cfg, thdSConn, filterS) + sts.sts, err = engine.NewStatService(sts.dm.GetDM(), sts.cfg, filterS, sts.connMgr) if err != nil { utils.Logger.Crit(fmt.Sprintf(" Could not init, error: %s", err.Error())) return @@ -105,13 +97,7 @@ func (sts *StatService) GetIntenternalChan() (conn chan rpcclient.ClientConnecto // Reload handles the change of config func (sts *StatService) Reload() (err error) { - var thdSConn rpcclient.ClientConnector - if thdSConn, err = NewConnection(sts.cfg, sts.thrsChan, sts.dispatcherChan, sts.cfg.StatSCfg().ThresholdSConns); err != nil { - utils.Logger.Crit(fmt.Sprintf("<%s> Could not connect to ThresholdS: %s", utils.StatS, err.Error())) - return - } sts.Lock() - sts.sts.SetThresholdConnection(thdSConn) sts.sts.Reload() sts.Unlock() return diff --git a/services/stats_it_test.go b/services/stats_it_test.go index 764edce2d..062eab460 100644 --- a/services/stats_it_test.go +++ b/services/stats_it_test.go @@ -24,6 +24,8 @@ import ( "testing" "time" + "github.com/cgrates/rpcclient" + "github.com/cgrates/cgrates/config" "github.com/cgrates/cgrates/engine" "github.com/cgrates/cgrates/servmanager" @@ -52,8 +54,8 @@ func TestStatSReload(t *testing.T) { server := utils.NewServer() srvMngr := servmanager.NewServiceManager(cfg, engineShutdown) db := NewDataDBService(cfg) - tS := NewThresholdService(cfg, db, chS, filterSChan, server) - sS := NewStatService(cfg, db, chS, filterSChan, server, tS.GetIntenternalChan(), nil) + tS := NewThresholdService(cfg, db, chS, filterSChan, server, make(chan rpcclient.RpcClientConnection, 1)) + sS := NewStatService(cfg, db, chS, filterSChan, server, make(chan rpcclient.RpcClientConnection, 1), nil) srvMngr.AddServices(NewConnManagerService(cfg, nil), tS, sS, NewLoaderService(cfg, db, filterSChan, server, nil, nil, engineShutdown), db) if err = srvMngr.StartServices(); err != nil { t.Error(err) diff --git a/services/suppliers_it_test.go b/services/suppliers_it_test.go index 22ea4b6e2..3fa015caa 100644 --- a/services/suppliers_it_test.go +++ b/services/suppliers_it_test.go @@ -24,6 +24,8 @@ import ( "testing" "time" + "github.com/cgrates/rpcclient" + "github.com/cgrates/cgrates/config" "github.com/cgrates/cgrates/engine" "github.com/cgrates/cgrates/servmanager" @@ -50,7 +52,7 @@ func TestSupplierSReload(t *testing.T) { server := utils.NewServer() srvMngr := servmanager.NewServiceManager(cfg, engineShutdown) db := NewDataDBService(cfg) - sts := NewStatService(cfg, db, chS, filterSChan, server, nil, nil) + sts := NewStatService(cfg, db, chS, filterSChan, server, make(chan rpcclient.RpcClientConnection, 1), nil) supS := NewSupplierService(cfg, db, chS, filterSChan, server, nil, sts.GetIntenternalChan(), nil, nil) srvMngr.AddServices(NewConnManagerService(cfg, nil), supS, sts, NewLoaderService(cfg, db, filterSChan, server, nil, nil, engineShutdown), db) if err = srvMngr.StartServices(); err != nil { diff --git a/services/thresholds.go b/services/thresholds.go index 532da3854..9a2ac67d9 100644 --- a/services/thresholds.go +++ b/services/thresholds.go @@ -33,9 +33,9 @@ import ( // NewThresholdService returns the Threshold Service func NewThresholdService(cfg *config.CGRConfig, dm *DataDBService, cacheS *engine.CacheS, filterSChan chan *engine.FilterS, - server *utils.Server) servmanager.Service { + server *utils.Server, internalThresholdSChan chan rpcclient.RpcClientConnection) servmanager.Service { return &ThresholdService{ - connChan: make(chan rpcclient.ClientConnector, 1), + connChan: internalThresholdSChan, cfg: cfg, dm: dm, cacheS: cacheS, diff --git a/services/thresholds_it_test.go b/services/thresholds_it_test.go index a9f3fa476..e2f1d733c 100644 --- a/services/thresholds_it_test.go +++ b/services/thresholds_it_test.go @@ -28,6 +28,7 @@ import ( "github.com/cgrates/cgrates/engine" "github.com/cgrates/cgrates/servmanager" "github.com/cgrates/cgrates/utils" + "github.com/cgrates/rpcclient" ) func TestThresholdSReload(t *testing.T) { @@ -48,7 +49,7 @@ func TestThresholdSReload(t *testing.T) { server := utils.NewServer() srvMngr := servmanager.NewServiceManager(cfg, engineShutdown) db := NewDataDBService(cfg) - tS := NewThresholdService(cfg, db, chS, filterSChan, server) + tS := NewThresholdService(cfg, db, chS, filterSChan, server, make(chan rpcclient.RpcClientConnection, 1)) srvMngr.AddServices(NewConnManagerService(cfg, nil), tS, NewLoaderService(cfg, db, filterSChan, server, nil, nil, engineShutdown), db) if err = srvMngr.StartServices(); err != nil { t.Error(err)