From 9435b99f1d7ce61c1c3357c3aef71cfe24a406f4 Mon Sep 17 00:00:00 2001 From: TeoV Date: Mon, 9 Dec 2019 03:58:43 -0500 Subject: [PATCH] Add connection from ThresholdS to ResourceS through ConnManager --- cmd/cgr-engine/cgr-engine.go | 5 +- config/config_it_test.go | 4 +- config/config_json_test.go | 2 +- config/config_test.go | 2 +- config/configsanity.go | 9 ++-- config/configsanity_test.go | 8 +--- config/libconfig_json.go | 2 +- config/resourcescfg.go | 14 ++++-- config/resourcescfg_test.go | 2 +- .../samples/acc_balance_keep/cgrates.json | 4 +- .../samples/acc_balance_keep_gob/cgrates.json | 4 +- data/conf/samples/cdrewithfilter/cgrates.json | 4 +- data/conf/samples/dbinternal/cgrates.json | 4 +- .../samples/loaders/tutmongo/cgrates.json | 4 +- .../samples/loaders/tutmysql/cgrates.json | 4 +- data/conf/samples/mongoatlas/cgrates.json | 4 +- data/conf/samples/mongoreplica/cgrates.json | 4 +- .../remote_replication/internal/cgrates.json | 4 +- .../internal_gob/cgrates.json | 4 +- data/conf/samples/reslimiter/cgrates.json | 3 -- data/conf/samples/tls/cgrates.json | 4 +- data/conf/samples/tls_gob/cgrates.json | 4 +- data/conf/samples/tutinternal/cgrates.json | 4 +- data/conf/samples/tutmongo/cgrates.json | 4 +- data/conf/samples/tutmongo2/cgrates.json | 4 +- data/conf/samples/tutmongo2_gob/cgrates.json | 8 +--- data/conf/samples/tutmongo_gob/cgrates.json | 4 +- data/conf/samples/tutmongonew/cgrates.json | 4 +- data/conf/samples/tutmysql/cgrates.json | 4 +- data/conf/samples/tutmysql2/cgrates.json | 4 +- data/conf/samples/tutmysql2_gob/cgrates.json | 4 +- .../samples/tutmysql_internal/cgrates.json | 4 +- data/conf/samples/tutpostgres/cgrates.json | 4 +- .../osips/cgrates/etc/cgrates/cgrates.json | 4 +- .../osips/cgrates/etc/cgrates/cgrates.json | 4 +- engine/resources.go | 15 +++--- engine/resources_test.go | 8 ++-- services/resources.go | 48 +++++++------------ services/resources_it_test.go | 2 +- 39 files changed, 80 insertions(+), 148 deletions(-) diff --git a/cmd/cgr-engine/cgr-engine.go b/cmd/cgr-engine/cgr-engine.go index 14eeb7b60..1b43a6ee9 100644 --- a/cmd/cgr-engine/cgr-engine.go +++ b/cmd/cgr-engine/cgr-engine.go @@ -496,6 +496,7 @@ func main() { internalChargerSChan := make(chan rpcclient.RpcClientConnection, 1) // needed to avod cyclic dependency internalThresholdSChan := make(chan rpcclient.RpcClientConnection, 1) // needed to avod cyclic dependency internalStatSChan := make(chan rpcclient.RpcClientConnection, 1) // needed to avod cyclic dependency + internalResourceSChan := make(chan rpcclient.RpcClientConnection, 1) // needed to avod cyclic dependency // init CacheS cacheS := initCacheS(internalCacheSChan, server, dmService.GetDM(), exitChan) @@ -519,7 +520,7 @@ func main() { utils.ConcatenatedKey(utils.MetaInternal, utils.MetaChargers): internalChargerSChan, utils.GuardianSv1: internalGuardianSChan, //utils.LoaderSv1: ldrs.GetIntenternalChan(), - //utils.ResourceSv1: reS.GetIntenternalChan(), + utils.ConcatenatedKey(utils.MetaInternal, utils.MetaResources): internalResourceSChan, //utils.Responder: rals.GetResponder().GetIntenternalChan(), //utils.SchedulerSv1: schS.GetIntenternalChan(), utils.ConcatenatedKey(utils.MetaInternal, utils.MetaSessionS): internalSessionSChan, @@ -540,7 +541,7 @@ func main() { stS := services.NewStatService(cfg, dmService, cacheS, filterSChan, server, internalStatSChan, connManager.GetConnMgr()) reS := services.NewResourceService(cfg, dmService, cacheS, filterSChan, server, - tS.GetIntenternalChan(), dspS.GetIntenternalChan()) + internalResourceSChan, connManager.GetConnMgr()) supS := services.NewSupplierService(cfg, dmService, cacheS, filterSChan, server, attrS.GetIntenternalChan(), stS.GetIntenternalChan(), reS.GetIntenternalChan(), dspS.GetIntenternalChan()) diff --git a/config/config_it_test.go b/config/config_it_test.go index 81ea6b964..f60f28c4e 100644 --- a/config/config_it_test.go +++ b/config/config_it_test.go @@ -203,9 +203,7 @@ func TestCGRConfigReloadResourceS(t *testing.T) { StringIndexedFields: &[]string{utils.Account}, PrefixIndexedFields: &[]string{}, IndexedSelects: true, - ThresholdSConns: []*RemoteHost{ - &RemoteHost{Address: "127.0.0.1:2012", Transport: utils.MetaJSON}, - }, + ThresholdSConns: []string{utils.MetaLocalHost}, } if !reflect.DeepEqual(expAttr, cfg.ResourceSCfg()) { t.Errorf("Expected %s , received: %s ", utils.ToJSON(expAttr), utils.ToJSON(cfg.ResourceSCfg())) diff --git a/config/config_json_test.go b/config/config_json_test.go index c1ed9e7c2..c977c7f71 100755 --- a/config/config_json_test.go +++ b/config/config_json_test.go @@ -891,7 +891,7 @@ func TestDfResourceLimiterSJsonCfg(t *testing.T) { eCfg := &ResourceSJsonCfg{ Enabled: utils.BoolPointer(false), Indexed_selects: utils.BoolPointer(true), - Thresholds_conns: &[]*RemoteHostJson{}, + Thresholds_conns: &[]string{}, Store_interval: utils.StringPointer(""), String_indexed_fields: nil, Prefix_indexed_fields: &[]string{}, diff --git a/config/config_test.go b/config/config_test.go index 15c2a41cd..b33228c35 100755 --- a/config/config_test.go +++ b/config/config_test.go @@ -829,7 +829,7 @@ func TestCgrCfgJSONDefaultsResLimCfg(t *testing.T) { eResLiCfg := &ResourceSConfig{ Enabled: false, IndexedSelects: true, - ThresholdSConns: []*RemoteHost{}, + ThresholdSConns: []string{}, StoreInterval: 0, StringIndexedFields: nil, PrefixIndexedFields: &[]string{}, diff --git a/config/configsanity.go b/config/configsanity.go index e6f7bdb52..46066019b 100644 --- a/config/configsanity.go +++ b/config/configsanity.go @@ -327,11 +327,14 @@ func (cfg *CGRConfig) checkConfigSanity() error { } } // ResourceLimiter checks - if cfg.resourceSCfg.Enabled && !cfg.thresholdSCfg.Enabled && !cfg.dispatcherSCfg.Enabled { - for _, connCfg := range cfg.resourceSCfg.ThresholdSConns { - if connCfg.Address == utils.MetaInternal { + if cfg.resourceSCfg.Enabled { + for _, connID := range cfg.resourceSCfg.ThresholdSConns { + if strings.HasPrefix(connID, utils.MetaInternal) && !cfg.thresholdSCfg.Enabled { return fmt.Errorf("<%s> not enabled but requested by <%s> component.", utils.ThresholdS, utils.ResourceS) } + if _, has := cfg.rpcConns[connID]; !has && !strings.HasPrefix(connID, utils.MetaInternal) { + return fmt.Errorf("<%s> Connection with id: <%s> not defined", utils.ResourceS, connID) + } } } // StatS checks diff --git a/config/configsanity_test.go b/config/configsanity_test.go index 09c3fddab..fe8a7a3d8 100644 --- a/config/configsanity_test.go +++ b/config/configsanity_test.go @@ -528,12 +528,8 @@ func TestConfigSanityHTTPAgent(t *testing.T) { func TestConfigSanityResourceLimiter(t *testing.T) { cfg, _ = NewDefaultCGRConfig() cfg.resourceSCfg = &ResourceSConfig{ - Enabled: true, - ThresholdSConns: []*RemoteHost{ - &RemoteHost{ - Address: utils.MetaInternal, - }, - }, + Enabled: true, + ThresholdSConns: []string{utils.MetaInternal}, } expected := " not enabled but requested by component." if err := cfg.checkConfigSanity(); err == nil || err.Error() != expected { diff --git a/config/libconfig_json.go b/config/libconfig_json.go index b8836f62c..796e79691 100755 --- a/config/libconfig_json.go +++ b/config/libconfig_json.go @@ -414,7 +414,7 @@ type ChargerSJsonCfg struct { type ResourceSJsonCfg struct { Enabled *bool Indexed_selects *bool - Thresholds_conns *[]*RemoteHostJson + Thresholds_conns *[]string Store_interval *string String_indexed_fields *[]string Prefix_indexed_fields *[]string diff --git a/config/resourcescfg.go b/config/resourcescfg.go index b1d9b94cf..fbed94d7f 100644 --- a/config/resourcescfg.go +++ b/config/resourcescfg.go @@ -27,7 +27,7 @@ import ( type ResourceSConfig struct { Enabled bool IndexedSelects bool - ThresholdSConns []*RemoteHost // Connections towards StatS + ThresholdSConns []string StoreInterval time.Duration // Dump regularly from cache into dataDB StringIndexedFields *[]string PrefixIndexedFields *[]string @@ -44,10 +44,14 @@ func (rlcfg *ResourceSConfig) loadFromJsonCfg(jsnCfg *ResourceSJsonCfg) (err err rlcfg.IndexedSelects = *jsnCfg.Indexed_selects } if jsnCfg.Thresholds_conns != nil { - rlcfg.ThresholdSConns = make([]*RemoteHost, len(*jsnCfg.Thresholds_conns)) - for idx, jsnHaCfg := range *jsnCfg.Thresholds_conns { - rlcfg.ThresholdSConns[idx] = NewDfltRemoteHost() - rlcfg.ThresholdSConns[idx].loadFromJsonCfg(jsnHaCfg) + rlcfg.ThresholdSConns = make([]string, len(*jsnCfg.Thresholds_conns)) + for idx, conn := range *jsnCfg.Thresholds_conns { + // if we have the connection internal we change the name so we can have internal rpc for each subsystem + if conn == utils.MetaInternal { + rlcfg.ThresholdSConns[idx] = utils.ConcatenatedKey(utils.MetaInternal, utils.MetaThresholds) + } else { + rlcfg.ThresholdSConns[idx] = conn + } } } if jsnCfg.Store_interval != nil { diff --git a/config/resourcescfg_test.go b/config/resourcescfg_test.go index edeb79035..0e6ba68ff 100644 --- a/config/resourcescfg_test.go +++ b/config/resourcescfg_test.go @@ -47,7 +47,7 @@ func TestResourceSConfigloadFromJsonCfg(t *testing.T) { expected = ResourceSConfig{ Enabled: true, StoreInterval: time.Duration(time.Second), - ThresholdSConns: []*RemoteHost{}, + ThresholdSConns: []string{}, PrefixIndexedFields: &[]string{"index1", "index2"}, } if jsnCfg, err := NewCgrJsonCfgFromBytes([]byte(cfgJSONStr)); err != nil { diff --git a/data/conf/samples/acc_balance_keep/cgrates.json b/data/conf/samples/acc_balance_keep/cgrates.json index 86e2f1294..e89b26be7 100644 --- a/data/conf/samples/acc_balance_keep/cgrates.json +++ b/data/conf/samples/acc_balance_keep/cgrates.json @@ -83,9 +83,7 @@ "resources": { "enabled": true, "store_interval": "1s", - "thresholds_conns": [ - {"address": "*internal"} - ], + "thresholds_conns": ["*internal"], }, diff --git a/data/conf/samples/acc_balance_keep_gob/cgrates.json b/data/conf/samples/acc_balance_keep_gob/cgrates.json index 97e25f6c8..beda5fbf9 100644 --- a/data/conf/samples/acc_balance_keep_gob/cgrates.json +++ b/data/conf/samples/acc_balance_keep_gob/cgrates.json @@ -83,9 +83,7 @@ "resources": { "enabled": true, "store_interval": "1s", - "thresholds_conns": [ - {"address": "*internal"} - ], + "thresholds_conns": ["*internal"] }, diff --git a/data/conf/samples/cdrewithfilter/cgrates.json b/data/conf/samples/cdrewithfilter/cgrates.json index 803577668..f930ad931 100755 --- a/data/conf/samples/cdrewithfilter/cgrates.json +++ b/data/conf/samples/cdrewithfilter/cgrates.json @@ -85,9 +85,7 @@ "resources": { "enabled": true, "store_interval": "1s", - "thresholds_conns": [ - {"address": "*internal"} - ], + "thresholds_conns": ["*internal"] }, diff --git a/data/conf/samples/dbinternal/cgrates.json b/data/conf/samples/dbinternal/cgrates.json index 21a923cf0..f31f41cdc 100755 --- a/data/conf/samples/dbinternal/cgrates.json +++ b/data/conf/samples/dbinternal/cgrates.json @@ -45,9 +45,7 @@ "resources": { "enabled": true, "store_interval": "1s", - "thresholds_conns": [ - {"address": "*internal"} - ], + "thresholds_conns": ["*internal"] }, diff --git a/data/conf/samples/loaders/tutmongo/cgrates.json b/data/conf/samples/loaders/tutmongo/cgrates.json index a6603bb79..a8138771e 100644 --- a/data/conf/samples/loaders/tutmongo/cgrates.json +++ b/data/conf/samples/loaders/tutmongo/cgrates.json @@ -79,9 +79,7 @@ "resources": { "enabled": true, "store_interval": "1s", - "thresholds_conns": [ - {"address": "*internal"} - ], + "thresholds_conns": ["*internal"] }, diff --git a/data/conf/samples/loaders/tutmysql/cgrates.json b/data/conf/samples/loaders/tutmysql/cgrates.json index 016246b6e..88eaa67fd 100644 --- a/data/conf/samples/loaders/tutmysql/cgrates.json +++ b/data/conf/samples/loaders/tutmysql/cgrates.json @@ -122,9 +122,7 @@ "resources": { "enabled": true, "store_interval": "1s", - "thresholds_conns": [ - {"address": "*internal"} - ], + "thresholds_conns": ["*internal"] }, diff --git a/data/conf/samples/mongoatlas/cgrates.json b/data/conf/samples/mongoatlas/cgrates.json index 0be387709..2fe85301e 100755 --- a/data/conf/samples/mongoatlas/cgrates.json +++ b/data/conf/samples/mongoatlas/cgrates.json @@ -104,9 +104,7 @@ "resources": { "enabled": true, "store_interval": "1s", - "thresholds_conns": [ - {"address": "*internal"} - ], + "thresholds_conns": ["*internal"] }, diff --git a/data/conf/samples/mongoreplica/cgrates.json b/data/conf/samples/mongoreplica/cgrates.json index f2eb2e2f8..2f9139bce 100755 --- a/data/conf/samples/mongoreplica/cgrates.json +++ b/data/conf/samples/mongoreplica/cgrates.json @@ -76,9 +76,7 @@ "resources": { "enabled": true, "store_interval": "1s", - "thresholds_conns": [ - {"address": "*internal"} - ], + "thresholds_conns": ["*internal"] }, diff --git a/data/conf/samples/remote_replication/internal/cgrates.json b/data/conf/samples/remote_replication/internal/cgrates.json index 053e43a72..98060a329 100644 --- a/data/conf/samples/remote_replication/internal/cgrates.json +++ b/data/conf/samples/remote_replication/internal/cgrates.json @@ -70,9 +70,7 @@ "resources": { "enabled": true, "store_interval": "-1", - "thresholds_conns": [ - {"address": "*internal"} - ], + "thresholds_conns": ["*internal"], }, diff --git a/data/conf/samples/remote_replication/internal_gob/cgrates.json b/data/conf/samples/remote_replication/internal_gob/cgrates.json index 7bb63f74c..eeb37e445 100644 --- a/data/conf/samples/remote_replication/internal_gob/cgrates.json +++ b/data/conf/samples/remote_replication/internal_gob/cgrates.json @@ -70,9 +70,7 @@ "resources": { "enabled": true, "store_interval": "-1", - "thresholds_conns": [ - {"address": "*internal"} - ], + "thresholds_conns": ["*internal"], }, diff --git a/data/conf/samples/reslimiter/cgrates.json b/data/conf/samples/reslimiter/cgrates.json index 2a987c829..c475ff147 100644 --- a/data/conf/samples/reslimiter/cgrates.json +++ b/data/conf/samples/reslimiter/cgrates.json @@ -30,9 +30,6 @@ "resources": { "enabled": true, - "stats_conns": [ - //{"address": "*internal"} - ], "cache_dump_interval": "0s", "usage_ttl": "3h", }, diff --git a/data/conf/samples/tls/cgrates.json b/data/conf/samples/tls/cgrates.json index 22de4360a..54298bf30 100755 --- a/data/conf/samples/tls/cgrates.json +++ b/data/conf/samples/tls/cgrates.json @@ -56,9 +56,7 @@ "resources": { "enabled": true, "store_interval": "1s", - "thresholds_conns": [ - {"address": "127.0.0.1:2012", "transport": "*json"} - ], + "thresholds_conns": ["*localhost"], }, diff --git a/data/conf/samples/tls_gob/cgrates.json b/data/conf/samples/tls_gob/cgrates.json index 4dcd0cd11..dbbd1f504 100755 --- a/data/conf/samples/tls_gob/cgrates.json +++ b/data/conf/samples/tls_gob/cgrates.json @@ -64,9 +64,7 @@ "resources": { "enabled": true, "store_interval": "1s", - "thresholds_conns": [ - {"address": "127.0.0.1:2013", "transport": "*gob"} - ], + "thresholds_conns": ["conn1"], }, diff --git a/data/conf/samples/tutinternal/cgrates.json b/data/conf/samples/tutinternal/cgrates.json index 1725dc8d0..f9e084698 100644 --- a/data/conf/samples/tutinternal/cgrates.json +++ b/data/conf/samples/tutinternal/cgrates.json @@ -65,9 +65,7 @@ "resources": { "enabled": true, "store_interval": "-1", - "thresholds_conns": [ - {"address": "*internal"} - ], + "thresholds_conns": ["*internal"] }, diff --git a/data/conf/samples/tutmongo/cgrates.json b/data/conf/samples/tutmongo/cgrates.json index 8bc4d40db..6eb9564fe 100644 --- a/data/conf/samples/tutmongo/cgrates.json +++ b/data/conf/samples/tutmongo/cgrates.json @@ -80,9 +80,7 @@ "resources": { "enabled": true, "store_interval": "1s", - "thresholds_conns": [ - {"address": "*internal"} - ], + "thresholds_conns": ["*internal"] }, diff --git a/data/conf/samples/tutmongo2/cgrates.json b/data/conf/samples/tutmongo2/cgrates.json index 179c9e143..b2963e252 100644 --- a/data/conf/samples/tutmongo2/cgrates.json +++ b/data/conf/samples/tutmongo2/cgrates.json @@ -141,9 +141,7 @@ "resources": { "enabled": true, - "thresholds_conns": [ - {"address": "127.0.0.1:2012", "transport":"*json"} - ], + "thresholds_conns": ["*localhost"], "string_indexed_fields": ["Account"] }, diff --git a/data/conf/samples/tutmongo2_gob/cgrates.json b/data/conf/samples/tutmongo2_gob/cgrates.json index 5fcbcc4e5..ddc10f10d 100644 --- a/data/conf/samples/tutmongo2_gob/cgrates.json +++ b/data/conf/samples/tutmongo2_gob/cgrates.json @@ -148,18 +148,14 @@ "resources": { "enabled": true, - "thresholds_conns": [ - {"address": "127.0.0.1:2013", "transport":"*gob"} - ], + "thresholds_conns": ["conn1"], "string_indexed_fields": ["Account"] }, "stats": { "enabled": true, - "thresholds_conns": [ - {"address": "127.0.0.1:2013", "transport":"*gob"} - ], + "thresholds_conns": ["conn1"], "string_indexed_fields": ["Account"] }, diff --git a/data/conf/samples/tutmongo_gob/cgrates.json b/data/conf/samples/tutmongo_gob/cgrates.json index db6190bff..953e12809 100644 --- a/data/conf/samples/tutmongo_gob/cgrates.json +++ b/data/conf/samples/tutmongo_gob/cgrates.json @@ -80,9 +80,7 @@ "resources": { "enabled": true, "store_interval": "1s", - "thresholds_conns": [ - {"address": "*internal"} - ], + "thresholds_conns": ["*internal"] }, diff --git a/data/conf/samples/tutmongonew/cgrates.json b/data/conf/samples/tutmongonew/cgrates.json index 1848eb668..c12112c9d 100644 --- a/data/conf/samples/tutmongonew/cgrates.json +++ b/data/conf/samples/tutmongonew/cgrates.json @@ -79,9 +79,7 @@ "resources": { "enabled": true, "store_interval": "1s", - "thresholds_conns": [ - {"address": "*internal"} - ], + "thresholds_conns": ["*internal"] }, diff --git a/data/conf/samples/tutmysql/cgrates.json b/data/conf/samples/tutmysql/cgrates.json index 851c95bc2..c67162025 100644 --- a/data/conf/samples/tutmysql/cgrates.json +++ b/data/conf/samples/tutmysql/cgrates.json @@ -85,9 +85,7 @@ "resources": { "enabled": true, "store_interval": "1s", - "thresholds_conns": [ - {"address": "*internal"} - ], + "thresholds_conns": ["*internal"] }, diff --git a/data/conf/samples/tutmysql2/cgrates.json b/data/conf/samples/tutmysql2/cgrates.json index b8ac0f976..572616137 100644 --- a/data/conf/samples/tutmysql2/cgrates.json +++ b/data/conf/samples/tutmysql2/cgrates.json @@ -106,9 +106,7 @@ "resources": { "enabled": true, - "thresholds_conns": [ - {"address": "127.0.0.1:2012", "transport":"*json"} - ], + "thresholds_conns": ["*localhost"], "string_indexed_fields": ["Account"] }, diff --git a/data/conf/samples/tutmysql2_gob/cgrates.json b/data/conf/samples/tutmysql2_gob/cgrates.json index c2960f24a..fef2c652c 100644 --- a/data/conf/samples/tutmysql2_gob/cgrates.json +++ b/data/conf/samples/tutmysql2_gob/cgrates.json @@ -114,9 +114,7 @@ "resources": { "enabled": true, - "thresholds_conns": [ - {"address": "127.0.0.1:2013", "transport":"*gob"} - ], + "thresholds_conns": ["conn1"], "string_indexed_fields": ["Account"] }, diff --git a/data/conf/samples/tutmysql_internal/cgrates.json b/data/conf/samples/tutmysql_internal/cgrates.json index 2208599d0..35cb69173 100644 --- a/data/conf/samples/tutmysql_internal/cgrates.json +++ b/data/conf/samples/tutmysql_internal/cgrates.json @@ -204,9 +204,7 @@ "resources": { "enabled": true, "store_interval": "1s", - "thresholds_conns": [ - {"address": "*internal"} - ], + "thresholds_conns": ["*internal"], }, diff --git a/data/conf/samples/tutpostgres/cgrates.json b/data/conf/samples/tutpostgres/cgrates.json index 52d7325e4..ff6c32ea7 100644 --- a/data/conf/samples/tutpostgres/cgrates.json +++ b/data/conf/samples/tutpostgres/cgrates.json @@ -53,9 +53,7 @@ "resources": { "enabled": true, "store_interval": "1s", - "thresholds_conns": [ - {"address": "*internal"} - ], + "thresholds_conns": ["*internal"] }, diff --git a/data/tutorial_tests/osips/cgrates/etc/cgrates/cgrates.json b/data/tutorial_tests/osips/cgrates/etc/cgrates/cgrates.json index a3bb280ef..8f9549ae2 100644 --- a/data/tutorial_tests/osips/cgrates/etc/cgrates/cgrates.json +++ b/data/tutorial_tests/osips/cgrates/etc/cgrates/cgrates.json @@ -95,9 +95,7 @@ "resources": { "enabled": true, - "thresholds_conns": [ - {"address": "*internal"} - ], + "thresholds_conns": ["*internal"], "string_indexed_fields": ["Account"], "prefix_indexed_fields": ["Destination"], }, diff --git a/data/tutorials/osips/cgrates/etc/cgrates/cgrates.json b/data/tutorials/osips/cgrates/etc/cgrates/cgrates.json index 831f58e67..de1c36c97 100644 --- a/data/tutorials/osips/cgrates/etc/cgrates/cgrates.json +++ b/data/tutorials/osips/cgrates/etc/cgrates/cgrates.json @@ -95,9 +95,7 @@ "resources": { "enabled": true, - "thresholds_conns": [ - {"address": "*internal"} - ], + "thresholds_conns": ["*internal"], "string_indexed_fields": ["Account"], "prefix_indexed_fields": ["Destination"], }, diff --git a/engine/resources.go b/engine/resources.go index 80b85d74a..0c90e936d 100644 --- a/engine/resources.go +++ b/engine/resources.go @@ -21,7 +21,6 @@ package engine import ( "fmt" "math/rand" - "reflect" "sort" "sync" "time" @@ -298,16 +297,15 @@ func (rs Resources) allocateResource(ru *ResourceUsage, dryRun bool) (alcMessage // NewResourceService returns a new ResourceService func NewResourceService(dm *DataManager, cgrcfg *config.CGRConfig, - thdS rpcclient.ClientConnector, filterS *FilterS) (*ResourceService, error) { - if thdS != nil && reflect.ValueOf(thdS).IsNil() { - thdS = nil - } - return &ResourceService{dm: dm, thdS: thdS, + filterS *FilterS, connMgr *ConnManager) (*ResourceService, error) { + return &ResourceService{dm: dm, storedResources: make(utils.StringMap), cgrcfg: cgrcfg, filterS: filterS, loopStoped: make(chan struct{}), - stopBackup: make(chan struct{})}, nil + stopBackup: make(chan struct{}), + connMgr: connMgr}, nil + } // ResourceService is the service handling resources @@ -320,6 +318,7 @@ type ResourceService struct { cgrcfg *config.CGRConfig stopBackup chan struct{} // control storing process loopStoped chan struct{} + connMgr *ConnManager } // Called to start the service @@ -435,7 +434,7 @@ func (rS *ResourceService) processThresholds(r *Resource, argDispatcher *utils.A ArgDispatcher: argDispatcher, } var tIDs []string - if err = rS.thdS.Call(utils.ThresholdSv1ProcessEvent, thEv, &tIDs); err != nil && + if err = rS.connMgr.Call(rS.cgrcfg.ResourceSCfg().ThresholdSConns, utils.ThresholdSv1ProcessEvent, thEv, &tIDs); err != nil && err.Error() != utils.ErrNotFound.Error() { utils.Logger.Warning( fmt.Sprintf("<%s> error: %s processing event %+v with %s.", diff --git a/engine/resources_test.go b/engine/resources_test.go index dd22131c1..896903ec9 100644 --- a/engine/resources_test.go +++ b/engine/resources_test.go @@ -372,8 +372,8 @@ func TestResourcePopulateResourceService(t *testing.T) { defaultCfg.ResourceSCfg().StoreInterval = 1 defaultCfg.ResourceSCfg().StringIndexedFields = nil defaultCfg.ResourceSCfg().PrefixIndexedFields = nil - resService, err = NewResourceService(dmRES, defaultCfg, nil, - &FilterS{dm: dmRES, cfg: defaultCfg}) + resService, err = NewResourceService(dmRES, defaultCfg, + &FilterS{dm: dmRES, cfg: defaultCfg}, nil) if err != nil { t.Errorf("Error: %+v", err) } @@ -734,8 +734,8 @@ func TestResourceCaching(t *testing.T) { defaultCfg.ResourceSCfg().StoreInterval = 1 defaultCfg.ResourceSCfg().StringIndexedFields = nil defaultCfg.ResourceSCfg().PrefixIndexedFields = nil - resService, err = NewResourceService(dmRES, defaultCfg, nil, - &FilterS{dm: dmRES, cfg: defaultCfg}) + resService, err = NewResourceService(dmRES, defaultCfg, + &FilterS{dm: dmRES, cfg: defaultCfg}, nil) if err != nil { t.Errorf("Error: %+v", err) } diff --git a/services/resources.go b/services/resources.go index e8fc19f91..970a3231a 100644 --- a/services/resources.go +++ b/services/resources.go @@ -33,34 +33,32 @@ import ( // NewResourceService returns the Resource Service func NewResourceService(cfg *config.CGRConfig, dm *DataDBService, cacheS *engine.CacheS, filterSChan chan *engine.FilterS, - server *utils.Server, thrsChan, - dispatcherChan chan rpcclient.ClientConnector) servmanager.Service { + server *utils.Server, internalResourceSChan chan rpcclient.RpcClientConnection, + connMgr *engine.ConnManager) servmanager.Service { return &ResourceService{ - connChan: make(chan rpcclient.ClientConnector, 1), - cfg: cfg, - dm: dm, - cacheS: cacheS, - filterSChan: filterSChan, - server: server, - thrsChan: thrsChan, - dispatcherChan: dispatcherChan, + connChan: internalResourceSChan, + cfg: cfg, + dm: dm, + cacheS: cacheS, + filterSChan: filterSChan, + server: server, + connMgr: connMgr, } } // ResourceService implements Service interface type ResourceService struct { sync.RWMutex - cfg *config.CGRConfig - dm *DataDBService - cacheS *engine.CacheS - filterSChan chan *engine.FilterS - server *utils.Server - thrsChan chan rpcclient.ClientConnector - dispatcherChan chan rpcclient.ClientConnector + cfg *config.CGRConfig + dm *DataDBService + cacheS *engine.CacheS + filterSChan chan *engine.FilterS + server *utils.Server reS *engine.ResourceService rpc *v1.ResourceSv1 - connChan chan rpcclient.ClientConnector + connChan chan rpcclient.RpcClientConnection + connMgr *engine.ConnManager } // Start should handle the sercive start @@ -76,15 +74,9 @@ func (reS *ResourceService) Start() (err error) { filterS := <-reS.filterSChan reS.filterSChan <- filterS - var thdSConn rpcclient.ClientConnector - if thdSConn, err = NewConnection(reS.cfg, reS.thrsChan, reS.dispatcherChan, reS.cfg.ResourceSCfg().ThresholdSConns); err != nil { - utils.Logger.Crit(fmt.Sprintf("<%s> Could not connect to ThresholdS: %s", utils.ResourceS, err.Error())) - return - } - reS.Lock() defer reS.Unlock() - reS.reS, err = engine.NewResourceService(reS.dm.GetDM(), reS.cfg, thdSConn, filterS) + reS.reS, err = engine.NewResourceService(reS.dm.GetDM(), reS.cfg, filterS, reS.connMgr) if err != nil { utils.Logger.Crit(fmt.Sprintf("<%s> Could not init, error: %s", utils.ResourceS, err.Error())) return @@ -106,13 +98,7 @@ func (reS *ResourceService) GetIntenternalChan() (conn chan rpcclient.ClientConn // Reload handles the change of config func (reS *ResourceService) Reload() (err error) { - var thdSConn rpcclient.ClientConnector - if thdSConn, err = NewConnection(reS.cfg, reS.thrsChan, reS.dispatcherChan, reS.cfg.ResourceSCfg().ThresholdSConns); err != nil { - utils.Logger.Crit(fmt.Sprintf("<%s> Could not connect to ThresholdS: %s", utils.ResourceS, err.Error())) - return - } reS.Lock() - reS.reS.SetThresholdConnection(thdSConn) reS.reS.Reload() reS.Unlock() return diff --git a/services/resources_it_test.go b/services/resources_it_test.go index 0cfec78be..9a74987f8 100644 --- a/services/resources_it_test.go +++ b/services/resources_it_test.go @@ -55,7 +55,7 @@ func TestResourceSReload(t *testing.T) { srvMngr := servmanager.NewServiceManager(cfg, engineShutdown) db := NewDataDBService(cfg) tS := NewThresholdService(cfg, db, chS, filterSChan, server, make(chan rpcclient.RpcClientConnection, 1)) - reS := NewResourceService(cfg, db, chS, filterSChan, server, tS.GetIntenternalChan(), nil) + reS := NewResourceService(cfg, db, chS, filterSChan, server, make(chan rpcclient.RpcClientConnection, 1), nil) srvMngr.AddServices(NewConnManagerService(cfg, nil), tS, reS, NewLoaderService(cfg, db, filterSChan, server, nil, nil, engineShutdown), db) if err = srvMngr.StartServices(); err != nil { t.Error(err)