diff --git a/cmd/cgr-engine/cgr-engine.go b/cmd/cgr-engine/cgr-engine.go index 1b43a6ee9..ad7d19494 100644 --- a/cmd/cgr-engine/cgr-engine.go +++ b/cmd/cgr-engine/cgr-engine.go @@ -497,6 +497,7 @@ func main() { internalThresholdSChan := make(chan rpcclient.RpcClientConnection, 1) // needed to avod cyclic dependency internalStatSChan := make(chan rpcclient.RpcClientConnection, 1) // needed to avod cyclic dependency internalResourceSChan := make(chan rpcclient.RpcClientConnection, 1) // needed to avod cyclic dependency + internalSupplierSChan := make(chan rpcclient.RpcClientConnection, 1) // needed to avod cyclic dependency // init CacheS cacheS := initCacheS(internalCacheSChan, server, dmService.GetDM(), exitChan) @@ -542,9 +543,7 @@ func main() { internalStatSChan, connManager.GetConnMgr()) reS := services.NewResourceService(cfg, dmService, cacheS, filterSChan, server, internalResourceSChan, connManager.GetConnMgr()) - supS := services.NewSupplierService(cfg, dmService, cacheS, filterSChan, server, - attrS.GetIntenternalChan(), stS.GetIntenternalChan(), - reS.GetIntenternalChan(), dspS.GetIntenternalChan()) + supS := services.NewSupplierService(cfg, dmService, cacheS, filterSChan, server, internalSupplierSChan, connManager.GetConnMgr()) schS := services.NewSchedulerService(cfg, dmService, cacheS, filterSChan, server, internalCDRServerChan, dspS.GetIntenternalChan()) rals := services.NewRalService(cfg, dmService, storDBService, cacheS, filterSChan, server, tS.GetIntenternalChan(), stS.GetIntenternalChan(), internalCacheSChan, diff --git a/config/config_it_test.go b/config/config_it_test.go index f60f28c4e..d15acddfe 100644 --- a/config/config_it_test.go +++ b/config/config_it_test.go @@ -228,9 +228,9 @@ func TestCGRConfigReloadSupplierS(t *testing.T) { Enabled: true, StringIndexedFields: &[]string{"LCRProfile"}, PrefixIndexedFields: &[]string{utils.Destination}, - ResourceSConns: []*RemoteHost{}, - StatSConns: []*RemoteHost{}, - AttributeSConns: []*RemoteHost{}, + ResourceSConns: []string{}, + StatSConns: []string{}, + AttributeSConns: []string{}, IndexedSelects: true, DefaultRatio: 1, } diff --git a/config/config_json_test.go b/config/config_json_test.go index c977c7f71..f2e3ff999 100755 --- a/config/config_json_test.go +++ b/config/config_json_test.go @@ -941,9 +941,9 @@ func TestDfSupplierSJsonCfg(t *testing.T) { Indexed_selects: utils.BoolPointer(true), String_indexed_fields: nil, Prefix_indexed_fields: &[]string{}, - Attributes_conns: &[]*RemoteHostJson{}, - Resources_conns: &[]*RemoteHostJson{}, - Stats_conns: &[]*RemoteHostJson{}, + Attributes_conns: &[]string{}, + Resources_conns: &[]string{}, + Stats_conns: &[]string{}, Default_ratio: utils.IntPointer(1), } if cfg, err := dfCgrJsonCfg.SupplierSJsonCfg(); err != nil { diff --git a/config/config_test.go b/config/config_test.go index b33228c35..482059311 100755 --- a/config/config_test.go +++ b/config/config_test.go @@ -873,9 +873,9 @@ func TestCgrCfgJSONDefaultSupplierSCfg(t *testing.T) { IndexedSelects: true, StringIndexedFields: nil, PrefixIndexedFields: &[]string{}, - AttributeSConns: []*RemoteHost{}, - ResourceSConns: []*RemoteHost{}, - StatSConns: []*RemoteHost{}, + AttributeSConns: []string{}, + ResourceSConns: []string{}, + StatSConns: []string{}, DefaultRatio: 1, } if !reflect.DeepEqual(eSupplSCfg, cgrCfg.supplierSCfg) { diff --git a/config/configsanity.go b/config/configsanity.go index 46066019b..21269c347 100644 --- a/config/configsanity.go +++ b/config/configsanity.go @@ -349,26 +349,29 @@ func (cfg *CGRConfig) checkConfigSanity() error { } } // SupplierS checks - if cfg.supplierSCfg.Enabled && !cfg.dispatcherSCfg.Enabled { - if !cfg.resourceSCfg.Enabled { - for _, connCfg := range cfg.supplierSCfg.ResourceSConns { - if connCfg.Address == utils.MetaInternal { - return fmt.Errorf("<%s> not enabled but requested by <%s> component.", utils.ResourceS, utils.SupplierS) - } + if cfg.supplierSCfg.Enabled { + for _, connID := range cfg.supplierSCfg.AttributeSConns { + if strings.HasPrefix(connID, utils.MetaInternal) && !cfg.attributeSCfg.Enabled { + return fmt.Errorf("<%s> not enabled but requested by <%s> component.", utils.AttributeS, utils.SupplierS) + } + if _, has := cfg.rpcConns[connID]; !has && !strings.HasPrefix(connID, utils.MetaInternal) { + return fmt.Errorf("<%s> Connection with id: <%s> not defined", utils.SupplierS, connID) } } - if !cfg.statsCfg.Enabled { - for _, connCfg := range cfg.supplierSCfg.StatSConns { - if connCfg.Address == utils.MetaInternal { - return fmt.Errorf("<%s> not enabled but requested by <%s> component.", utils.StatService, utils.SupplierS) - } + for _, connID := range cfg.supplierSCfg.StatSConns { + if strings.HasPrefix(connID, utils.MetaInternal) && !cfg.statsCfg.Enabled { + return fmt.Errorf("<%s> not enabled but requested by <%s> component.", utils.StatService, utils.SupplierS) + } + if _, has := cfg.rpcConns[connID]; !has && !strings.HasPrefix(connID, utils.MetaInternal) { + return fmt.Errorf("<%s> Connection with id: <%s> not defined", utils.SupplierS, connID) } } - if !cfg.attributeSCfg.Enabled { - for _, connCfg := range cfg.supplierSCfg.AttributeSConns { - if connCfg.Address == utils.MetaInternal { - return fmt.Errorf("<%s> not enabled but requested by <%s> component.", utils.AttributeS, utils.SupplierS) - } + for _, connID := range cfg.supplierSCfg.ResourceSConns { + if strings.HasPrefix(connID, utils.MetaInternal) && !cfg.resourceSCfg.Enabled { + return fmt.Errorf("<%s> not enabled but requested by <%s> component.", utils.ResourceS, utils.SupplierS) + } + if _, has := cfg.rpcConns[connID]; !has && !strings.HasPrefix(connID, utils.MetaInternal) { + return fmt.Errorf("<%s> Connection with id: <%s> not defined", utils.SupplierS, connID) } } } diff --git a/config/configsanity_test.go b/config/configsanity_test.go index fe8a7a3d8..0f23d4ed2 100644 --- a/config/configsanity_test.go +++ b/config/configsanity_test.go @@ -553,33 +553,22 @@ func TestConfigSanitySupplierS(t *testing.T) { cfg, _ = NewDefaultCGRConfig() cfg.supplierSCfg.Enabled = true - cfg.supplierSCfg.ResourceSConns = []*RemoteHost{ - &RemoteHost{ - Address: utils.MetaInternal, - }, - } + cfg.supplierSCfg.ResourceSConns = []string{utils.MetaInternal} + expected := " not enabled but requested by component." if err := cfg.checkConfigSanity(); err == nil || err.Error() != expected { t.Errorf("Expecting: %+q received: %+q", expected, err) } cfg.resourceSCfg.Enabled = true - cfg.supplierSCfg.StatSConns = []*RemoteHost{ - &RemoteHost{ - Address: utils.MetaInternal, - }, - } + cfg.supplierSCfg.StatSConns = []string{utils.MetaInternal} expected = " not enabled but requested by component." if err := cfg.checkConfigSanity(); err == nil || err.Error() != expected { t.Errorf("Expecting: %+q received: %+q", expected, err) } cfg.statsCfg.Enabled = true - cfg.supplierSCfg.AttributeSConns = []*RemoteHost{ - &RemoteHost{ - Address: utils.MetaInternal, - }, - } + cfg.supplierSCfg.AttributeSConns = []string{utils.MetaInternal} expected = " not enabled but requested by component." if err := cfg.checkConfigSanity(); err == nil || err.Error() != expected { t.Errorf("Expecting: %+q received: %+q", expected, err) diff --git a/config/libconfig_json.go b/config/libconfig_json.go index 796e79691..c3adb8bfa 100755 --- a/config/libconfig_json.go +++ b/config/libconfig_json.go @@ -446,9 +446,9 @@ type SupplierSJsonCfg struct { Indexed_selects *bool String_indexed_fields *[]string Prefix_indexed_fields *[]string - Attributes_conns *[]*RemoteHostJson - Resources_conns *[]*RemoteHostJson - Stats_conns *[]*RemoteHostJson + Attributes_conns *[]string + Resources_conns *[]string + Stats_conns *[]string Default_ratio *int } diff --git a/config/supplierscfg.go b/config/supplierscfg.go index ae23d377e..d1c4d3f7e 100644 --- a/config/supplierscfg.go +++ b/config/supplierscfg.go @@ -18,15 +18,17 @@ along with this program. If not, see package config +import "github.com/cgrates/cgrates/utils" + // SupplierSCfg is the configuration of supplier service type SupplierSCfg struct { Enabled bool IndexedSelects bool StringIndexedFields *[]string PrefixIndexedFields *[]string - AttributeSConns []*RemoteHost - ResourceSConns []*RemoteHost - StatSConns []*RemoteHost + AttributeSConns []string + ResourceSConns []string + StatSConns []string DefaultRatio int } @@ -55,24 +57,36 @@ func (spl *SupplierSCfg) loadFromJsonCfg(jsnCfg *SupplierSJsonCfg) (err error) { spl.PrefixIndexedFields = &pif } if jsnCfg.Attributes_conns != nil { - spl.AttributeSConns = make([]*RemoteHost, len(*jsnCfg.Attributes_conns)) - for idx, jsnHaCfg := range *jsnCfg.Attributes_conns { - spl.AttributeSConns[idx] = NewDfltRemoteHost() - spl.AttributeSConns[idx].loadFromJsonCfg(jsnHaCfg) + spl.AttributeSConns = make([]string, len(*jsnCfg.Attributes_conns)) + for idx, conn := range *jsnCfg.Attributes_conns { + // if we have the connection internal we change the name so we can have internal rpc for each subsystem + if conn == utils.MetaInternal { + spl.AttributeSConns[idx] = utils.ConcatenatedKey(utils.MetaInternal, utils.MetaAttributes) + } else { + spl.AttributeSConns[idx] = conn + } } } if jsnCfg.Resources_conns != nil { - spl.ResourceSConns = make([]*RemoteHost, len(*jsnCfg.Resources_conns)) - for idx, jsnHaCfg := range *jsnCfg.Resources_conns { - spl.ResourceSConns[idx] = NewDfltRemoteHost() - spl.ResourceSConns[idx].loadFromJsonCfg(jsnHaCfg) + spl.ResourceSConns = make([]string, len(*jsnCfg.Resources_conns)) + for idx, conn := range *jsnCfg.Resources_conns { + // if we have the connection internal we change the name so we can have internal rpc for each subsystem + if conn == utils.MetaInternal { + spl.ResourceSConns[idx] = utils.ConcatenatedKey(utils.MetaInternal, utils.MetaResources) + } else { + spl.ResourceSConns[idx] = conn + } } } if jsnCfg.Stats_conns != nil { - spl.StatSConns = make([]*RemoteHost, len(*jsnCfg.Stats_conns)) - for idx, jsnHaCfg := range *jsnCfg.Stats_conns { - spl.StatSConns[idx] = NewDfltRemoteHost() - spl.StatSConns[idx].loadFromJsonCfg(jsnHaCfg) + spl.StatSConns = make([]string, len(*jsnCfg.Stats_conns)) + for idx, conn := range *jsnCfg.Stats_conns { + // if we have the connection internal we change the name so we can have internal rpc for each subsystem + if conn == utils.MetaInternal { + spl.StatSConns[idx] = utils.ConcatenatedKey(utils.MetaInternal, utils.MetaStats) + } else { + spl.StatSConns[idx] = conn + } } } if jsnCfg.Default_ratio != nil { diff --git a/config/supplierscfg_test.go b/config/supplierscfg_test.go index 58b13751a..beebfd4da 100644 --- a/config/supplierscfg_test.go +++ b/config/supplierscfg_test.go @@ -47,9 +47,9 @@ func TestSupplierSCfgloadFromJsonCfg(t *testing.T) { }` expected = SupplierSCfg{ PrefixIndexedFields: &[]string{"index1", "index2"}, - AttributeSConns: []*RemoteHost{}, - ResourceSConns: []*RemoteHost{}, - StatSConns: []*RemoteHost{}, + AttributeSConns: []string{}, + ResourceSConns: []string{}, + StatSConns: []string{}, DefaultRatio: 1, } if jsnCfg, err := NewCgrJsonCfgFromBytes([]byte(cfgJSONStr)); err != nil { diff --git a/data/conf/samples/acc_balance_keep/cgrates.json b/data/conf/samples/acc_balance_keep/cgrates.json index e89b26be7..9f4fef9b3 100644 --- a/data/conf/samples/acc_balance_keep/cgrates.json +++ b/data/conf/samples/acc_balance_keep/cgrates.json @@ -102,9 +102,7 @@ "suppliers": { "enabled": true, - "stats_conns": [ - {"address": "*internal"}, - ], + "stats_conns": ["*internal"], }, diff --git a/data/conf/samples/acc_balance_keep_gob/cgrates.json b/data/conf/samples/acc_balance_keep_gob/cgrates.json index beda5fbf9..908fa2b5e 100644 --- a/data/conf/samples/acc_balance_keep_gob/cgrates.json +++ b/data/conf/samples/acc_balance_keep_gob/cgrates.json @@ -102,9 +102,7 @@ "suppliers": { "enabled": true, - "stats_conns": [ - {"address": "*internal"}, - ], + "stats_conns": ["*internal"], }, diff --git a/data/conf/samples/cdrewithfilter/cgrates.json b/data/conf/samples/cdrewithfilter/cgrates.json index f930ad931..44ce120fd 100755 --- a/data/conf/samples/cdrewithfilter/cgrates.json +++ b/data/conf/samples/cdrewithfilter/cgrates.json @@ -103,9 +103,7 @@ "suppliers": { "enabled": true, - "stats_conns": [ - {"address": "*internal"}, - ], + "stats_conns": ["*localhost"], }, diff --git a/data/conf/samples/cluelrn/cgrates.json b/data/conf/samples/cluelrn/cgrates.json index 98295b407..7d35cf9a4 100644 --- a/data/conf/samples/cluelrn/cgrates.json +++ b/data/conf/samples/cluelrn/cgrates.json @@ -51,9 +51,7 @@ "suppliers": { "enabled": true, - "attributes_conns": [ - {"address": "127.0.0.1:2012", "transport":"*json"} - ], + "attributes_conns": ["*localhost"], }, diff --git a/data/conf/samples/loaders/tutmongo/cgrates.json b/data/conf/samples/loaders/tutmongo/cgrates.json index a8138771e..68b95d148 100644 --- a/data/conf/samples/loaders/tutmongo/cgrates.json +++ b/data/conf/samples/loaders/tutmongo/cgrates.json @@ -98,12 +98,8 @@ "suppliers": { "enabled": true, - "stats_conns": [ - {"address": "127.0.0.1:2012", "transport":"*json"}, - ], - "resources_conns": [ - {"address": "127.0.0.1:2012", "transport":"*json"}, - ], + "stats_conns": ["*localhost"], + "resources_conns": ["*localhost"], }, diff --git a/data/conf/samples/loaders/tutmysql/cgrates.json b/data/conf/samples/loaders/tutmysql/cgrates.json index 88eaa67fd..156a09a44 100644 --- a/data/conf/samples/loaders/tutmysql/cgrates.json +++ b/data/conf/samples/loaders/tutmysql/cgrates.json @@ -141,12 +141,8 @@ "suppliers": { "enabled": true, "prefix_indexed_fields":["Destination"], - "stats_conns": [ - {"address": "*internal"}, - ], - "resources_conns": [ - {"address": "*internal"}, - ], + "stats_conns": ["*internal"], + "resources_conns": ["*internal"], }, diff --git a/data/conf/samples/tutinternal/cgrates.json b/data/conf/samples/tutinternal/cgrates.json index f9e084698..37eecb2e5 100644 --- a/data/conf/samples/tutinternal/cgrates.json +++ b/data/conf/samples/tutinternal/cgrates.json @@ -84,12 +84,8 @@ "suppliers": { "enabled": true, "prefix_indexed_fields":["Destination"], - "stats_conns": [ - {"address": "*internal"}, - ], - "resources_conns": [ - {"address": "*internal"}, - ], + "stats_conns": ["*internal"], + "resources_conns": ["*internal"], }, diff --git a/data/conf/samples/tutmongo/cgrates.json b/data/conf/samples/tutmongo/cgrates.json index 6eb9564fe..9155d5058 100644 --- a/data/conf/samples/tutmongo/cgrates.json +++ b/data/conf/samples/tutmongo/cgrates.json @@ -99,12 +99,8 @@ "suppliers": { "enabled": true, - "stats_conns": [ - {"address": "127.0.0.1:2012", "transport":"*json"}, - ], - "resources_conns": [ - {"address": "127.0.0.1:2012", "transport":"*json"}, - ], + "stats_conns": ["*localhost"], + "resources_conns": ["*localhost"], }, diff --git a/data/conf/samples/tutmongo_gob/cgrates.json b/data/conf/samples/tutmongo_gob/cgrates.json index 953e12809..2d136c2a1 100644 --- a/data/conf/samples/tutmongo_gob/cgrates.json +++ b/data/conf/samples/tutmongo_gob/cgrates.json @@ -99,12 +99,8 @@ "suppliers": { "enabled": true, - "stats_conns": [ - {"address": "127.0.0.1:2013", "transport":"*gob"}, - ], - "resources_conns": [ - {"address": "127.0.0.1:2013", "transport":"*gob"}, - ], + "stats_conns": ["conn1"], + "resources_conns": ["conn1"], }, diff --git a/data/conf/samples/tutmongonew/cgrates.json b/data/conf/samples/tutmongonew/cgrates.json index c12112c9d..4b09eb643 100644 --- a/data/conf/samples/tutmongonew/cgrates.json +++ b/data/conf/samples/tutmongonew/cgrates.json @@ -98,9 +98,7 @@ "suppliers": { "enabled": true, - "stats_conns": [ - {"address": "*internal"}, - ], + "stats_conns": ["*internal"], }, diff --git a/data/conf/samples/tutmysql/cgrates.json b/data/conf/samples/tutmysql/cgrates.json index c67162025..22e1a3ae8 100644 --- a/data/conf/samples/tutmysql/cgrates.json +++ b/data/conf/samples/tutmysql/cgrates.json @@ -104,12 +104,8 @@ "suppliers": { "enabled": true, "prefix_indexed_fields":["Destination"], - "stats_conns": [ - {"address": "*internal"}, - ], - "resources_conns": [ - {"address": "*internal"}, - ], + "stats_conns": ["*internal"], + "resources_conns": ["*internal"], }, diff --git a/data/conf/samples/tutmysql_internal/cgrates.json b/data/conf/samples/tutmysql_internal/cgrates.json index 35cb69173..12ee29947 100644 --- a/data/conf/samples/tutmysql_internal/cgrates.json +++ b/data/conf/samples/tutmysql_internal/cgrates.json @@ -223,9 +223,7 @@ "suppliers": { "enabled": true, "prefix_indexed_fields":["Destination",], - "stats_conns": [ - {"address": "*internal"}, - ], + "stats_conns": ["*internal"], }, diff --git a/data/tutorial_tests/asterisk_ari/cgrates/etc/cgrates/cgrates.json b/data/tutorial_tests/asterisk_ari/cgrates/etc/cgrates/cgrates.json index b0dfe8613..eb94d966e 100644 --- a/data/tutorial_tests/asterisk_ari/cgrates/etc/cgrates/cgrates.json +++ b/data/tutorial_tests/asterisk_ari/cgrates/etc/cgrates/cgrates.json @@ -127,12 +127,8 @@ "suppliers": { "enabled": true, - "resources_conns": [ - {"address": "*internal"}, - ], - "stats_conns": [ - {"address": "*internal"}, - ], + "resources_conns": ["*internal"], + "stats_conns": ["*internal"], "string_indexed_fields": ["Account"], }, diff --git a/data/tutorial_tests/fs_evsock/cgrates/etc/cgrates/cgrates.json b/data/tutorial_tests/fs_evsock/cgrates/etc/cgrates/cgrates.json index c277fa694..b0b280890 100644 --- a/data/tutorial_tests/fs_evsock/cgrates/etc/cgrates/cgrates.json +++ b/data/tutorial_tests/fs_evsock/cgrates/etc/cgrates/cgrates.json @@ -124,12 +124,8 @@ "suppliers": { "enabled": true, - "resources_conns": [ - {"address": "*internal"}, - ], - "stats_conns": [ - {"address": "*internal"}, - ], + "resources_conns": ["*internal"], + "stats_conns": ["*internal"], "string_indexed_fields": ["Account"], }, diff --git a/data/tutorial_tests/kamevapi/cgrates/etc/cgrates/cgrates.json b/data/tutorial_tests/kamevapi/cgrates/etc/cgrates/cgrates.json index 20f879a5a..cf0bfa807 100644 --- a/data/tutorial_tests/kamevapi/cgrates/etc/cgrates/cgrates.json +++ b/data/tutorial_tests/kamevapi/cgrates/etc/cgrates/cgrates.json @@ -123,12 +123,8 @@ "suppliers": { "enabled": true, - "resources_conns": [ - {"address": "*internal"}, - ], - "stats_conns": [ - {"address": "*internal"}, - ], + "resources_conns": ["*internal"], + "stats_conns": ["*internal"], "string_indexed_fields": ["Account"], }, diff --git a/data/tutorial_tests/osips/cgrates/etc/cgrates/cgrates.json b/data/tutorial_tests/osips/cgrates/etc/cgrates/cgrates.json index 8f9549ae2..dcbd618e5 100644 --- a/data/tutorial_tests/osips/cgrates/etc/cgrates/cgrates.json +++ b/data/tutorial_tests/osips/cgrates/etc/cgrates/cgrates.json @@ -116,12 +116,8 @@ "suppliers": { "enabled": true, - "resources_conns": [ - {"address": "*internal"} - ], - "stats_conns": [ - {"address": "*internal"} - ], + "resources_conns": ["*internal"], + "stats_conns": ["*internal"], "string_indexed_fields": ["Account"], "prefix_indexed_fields": ["Destination"], }, diff --git a/data/tutorials/asterisk_ari/cgrates/etc/cgrates/cgrates.json b/data/tutorials/asterisk_ari/cgrates/etc/cgrates/cgrates.json index 90b7e3ea2..248f8af62 100644 --- a/data/tutorials/asterisk_ari/cgrates/etc/cgrates/cgrates.json +++ b/data/tutorials/asterisk_ari/cgrates/etc/cgrates/cgrates.json @@ -127,12 +127,8 @@ "suppliers": { "enabled": true, - "resources_conns": [ - {"address": "*internal"}, - ], - "stats_conns": [ - {"address": "*internal"}, - ], + "resources_conns": ["*internal"], + "stats_conns": ["*internal"], "string_indexed_fields": ["Account"], }, diff --git a/data/tutorials/fs_evsock/cgrates/etc/cgrates/cgrates.json b/data/tutorials/fs_evsock/cgrates/etc/cgrates/cgrates.json index c277fa694..b0b280890 100644 --- a/data/tutorials/fs_evsock/cgrates/etc/cgrates/cgrates.json +++ b/data/tutorials/fs_evsock/cgrates/etc/cgrates/cgrates.json @@ -124,12 +124,8 @@ "suppliers": { "enabled": true, - "resources_conns": [ - {"address": "*internal"}, - ], - "stats_conns": [ - {"address": "*internal"}, - ], + "resources_conns": ["*internal"], + "stats_conns": ["*internal"], "string_indexed_fields": ["Account"], }, diff --git a/data/tutorials/kamevapi/cgrates/etc/cgrates/cgrates.json b/data/tutorials/kamevapi/cgrates/etc/cgrates/cgrates.json index 8e6aae082..d01ef48e6 100644 --- a/data/tutorials/kamevapi/cgrates/etc/cgrates/cgrates.json +++ b/data/tutorials/kamevapi/cgrates/etc/cgrates/cgrates.json @@ -123,12 +123,8 @@ "suppliers": { "enabled": true, - "resources_conns": [ - {"address": "*internal"}, - ], - "stats_conns": [ - {"address": "*internal"}, - ], + "resources_conns": ["*internal"], + "stats_conns": ["*internal"], "string_indexed_fields": ["Account"], }, diff --git a/data/tutorials/osips/cgrates/etc/cgrates/cgrates.json b/data/tutorials/osips/cgrates/etc/cgrates/cgrates.json index de1c36c97..66a23166d 100644 --- a/data/tutorials/osips/cgrates/etc/cgrates/cgrates.json +++ b/data/tutorials/osips/cgrates/etc/cgrates/cgrates.json @@ -116,12 +116,8 @@ "suppliers": { "enabled": true, - "resources_conns": [ - {"address": "*internal"} - ], - "stats_conns": [ - {"address": "*internal"} - ], + "resources_conns": ["*internal"], + "stats_conns": ["*internal"], "string_indexed_fields": ["Account"], "prefix_indexed_fields": ["Destination"], }, diff --git a/engine/suppliers.go b/engine/suppliers.go index b3ff13b13..2f6d870b1 100644 --- a/engine/suppliers.go +++ b/engine/suppliers.go @@ -20,7 +20,6 @@ package engine import ( "fmt" - "reflect" "sort" "strconv" "strings" @@ -28,7 +27,6 @@ import ( "github.com/cgrates/cgrates/config" "github.com/cgrates/cgrates/utils" - "github.com/cgrates/rpcclient" ) // Supplier defines supplier related information used within a SupplierProfile @@ -110,24 +108,12 @@ func (lps SupplierProfiles) Sort() { // NewSupplierService initializes the Supplier Service func NewSupplierService(dm *DataManager, - filterS *FilterS, cgrcfg *config.CGRConfig, resourceS, - statS, attributeS rpcclient.ClientConnector) (spS *SupplierService, err error) { - if attributeS != nil && reflect.ValueOf(attributeS).IsNil() { // fix nil value in interface - attributeS = nil - } - if resourceS != nil && reflect.ValueOf(resourceS).IsNil() { // fix nil value in interface - resourceS = nil - } - if statS != nil && reflect.ValueOf(statS).IsNil() { // fix nil value in interface - statS = nil - } + filterS *FilterS, cgrcfg *config.CGRConfig, connMgr *ConnManager) (spS *SupplierService, err error) { spS = &SupplierService{ - dm: dm, - filterS: filterS, - attributeS: attributeS, - resourceS: resourceS, - statS: statS, - cgrcfg: cgrcfg, + dm: dm, + filterS: filterS, + cgrcfg: cgrcfg, + connMgr: connMgr, } if spS.sorter, err = NewSupplierSortDispatcher(spS); err != nil { return nil, err @@ -137,13 +123,11 @@ func NewSupplierService(dm *DataManager, // SupplierService is the service computing Supplier queries type SupplierService struct { - dm *DataManager - filterS *FilterS - cgrcfg *config.CGRConfig - attributeS rpcclient.ClientConnector - resourceS rpcclient.ClientConnector - statS rpcclient.ClientConnector - sorter SupplierSortDispatcher + dm *DataManager + filterS *FilterS + cgrcfg *config.CGRConfig + sorter SupplierSortDispatcher + connMgr *ConnManager } // ListenAndServe will initialize the service @@ -309,10 +293,10 @@ func (spS *SupplierService) costForEvent(ev *utils.CGREvent, func (spS *SupplierService) statMetrics(statIDs []string, tenant string) (stsMetric map[string]float64, err error) { stsMetric = make(map[string]float64) provStsMetrics := make(map[string][]float64) - if spS.statS != nil { + if len(spS.cgrcfg.SupplierSCfg().StatSConns) != 0 { for _, statID := range statIDs { var metrics map[string]float64 - if err = spS.statS.Call(utils.StatSv1GetQueueFloatMetrics, + if err = spS.connMgr.Call(spS.cgrcfg.SupplierSCfg().StatSConns, utils.StatSv1GetQueueFloatMetrics, &utils.TenantIDWithArgDispatcher{TenantID: &utils.TenantID{Tenant: tenant, ID: statID}}, &metrics); err != nil && err.Error() != utils.ErrNotFound.Error() { utils.Logger.Warning( @@ -338,12 +322,12 @@ func (spS *SupplierService) statMetrics(statIDs []string, tenant string) (stsMet // first metric found is always returned func (spS *SupplierService) statMetricsForLoadDistribution(statIDs []string, tenant string) (result float64, err error) { provStsMetrics := make(map[string][]float64) - if spS.statS != nil { + if len(spS.cgrcfg.SupplierSCfg().StatSConns) != 0 { for _, statID := range statIDs { // check if we get an ID in the following form (StatID:MetricID) statWithMetric := strings.Split(statID, utils.InInFieldSep) var metrics map[string]float64 - if err = spS.statS.Call(utils.StatSv1GetQueueFloatMetrics, + if err = spS.connMgr.Call(spS.cgrcfg.SupplierSCfg().StatSConns, utils.StatSv1GetQueueFloatMetrics, &utils.TenantIDWithArgDispatcher{TenantID: &utils.TenantID{Tenant: tenant, ID: statWithMetric[0]}}, &metrics); err != nil && err.Error() != utils.ErrNotFound.Error() { utils.Logger.Warning( @@ -376,10 +360,10 @@ func (spS *SupplierService) statMetricsForLoadDistribution(statIDs []string, ten // resourceUsage returns sum of all resource usages out of list func (spS *SupplierService) resourceUsage(resIDs []string, tenant string) (tUsage float64, err error) { - if spS.resourceS != nil { + if len(spS.cgrcfg.SupplierSCfg().ResourceSConns) != 0 { for _, resID := range resIDs { var res Resource - if err = spS.resourceS.Call(utils.ResourceSv1GetResource, + if err = spS.connMgr.Call(spS.cgrcfg.SupplierSCfg().ResourceSConns, utils.ResourceSv1GetResource, &utils.TenantID{Tenant: tenant, ID: resID}, &res); err != nil && err.Error() != utils.ErrNotFound.Error() { utils.Logger.Warning( @@ -593,14 +577,14 @@ func (spS *SupplierService) V1GetSuppliers(args *ArgsGetSuppliers, reply *Sorted } else if args.CGREvent.Event == nil { return utils.NewErrMandatoryIeMissing(utils.Event) } - if spS.attributeS != nil { + if len(spS.cgrcfg.SupplierSCfg().AttributeSConns) != 0 { attrArgs := &AttrArgsProcessEvent{ Context: utils.StringPointer(utils.MetaSuppliers), CGREvent: args.CGREvent, ArgDispatcher: args.ArgDispatcher, } var rplyEv AttrSProcessEventReply - if err := spS.attributeS.Call(utils.AttributeSv1ProcessEvent, + if err := spS.connMgr.Call(spS.cgrcfg.SupplierSCfg().AttributeSConns, utils.AttributeSv1ProcessEvent, attrArgs, &rplyEv); err == nil && len(rplyEv.AlteredFields) != 0 { args.CGREvent = rplyEv.CGREvent } else if err.Error() != utils.ErrNotFound.Error() { @@ -635,21 +619,3 @@ func (spS *SupplierService) V1GetSupplierProfilesForEvent(args *utils.CGREventWi *reply = sPs return } - -// SetAttributeSConnection sets the new connection to the attribute service -// only used on reload -func (spS *SupplierService) SetAttributeSConnection(attrS rpcclient.ClientConnector) { - spS.attributeS = attrS -} - -// SetStatSConnection sets the new connection to the stat service -// only used on reload -func (spS *SupplierService) SetStatSConnection(stS rpcclient.ClientConnector) { - spS.statS = stS -} - -// SetResourceSConnection sets the new connection to the resource service -// only used on reload -func (spS *SupplierService) SetResourceSConnection(rS rpcclient.ClientConnector) { - spS.resourceS = rS -} diff --git a/engine/suppliers_test.go b/engine/suppliers_test.go index 84501d999..f6cab43f3 100644 --- a/engine/suppliers_test.go +++ b/engine/suppliers_test.go @@ -303,8 +303,7 @@ func TestSuppliersPopulateSupplierService(t *testing.T) { defaultCfg.SupplierSCfg().StringIndexedFields = nil defaultCfg.SupplierSCfg().PrefixIndexedFields = nil splService, err = NewSupplierService(dmSPP, &FilterS{ - dm: dmSPP, - cfg: defaultCfg}, defaultCfg, nil, nil, nil) + dm: dmSPP, cfg: defaultCfg}, defaultCfg, nil) if err != nil { t.Errorf("Error: %+v", err) } diff --git a/services/suppliers.go b/services/suppliers.go index 9e2a42e47..10f476439 100644 --- a/services/suppliers.go +++ b/services/suppliers.go @@ -33,34 +33,28 @@ import ( // NewSupplierService returns the Supplier Service func NewSupplierService(cfg *config.CGRConfig, dm *DataDBService, cacheS *engine.CacheS, filterSChan chan *engine.FilterS, - server *utils.Server, attrsChan, stsChan, resChan, - dispatcherChan chan rpcclient.ClientConnector) servmanager.Service { + server *utils.Server, internalSupplierSChan chan rpcclient.RpcClientConnection, + connMgr *engine.ConnManager) servmanager.Service { return &SupplierService{ - connChan: make(chan rpcclient.ClientConnector, 1), - cfg: cfg, - dm: dm, - cacheS: cacheS, - filterSChan: filterSChan, - server: server, - attrsChan: attrsChan, - stsChan: stsChan, - resChan: resChan, - dispatcherChan: dispatcherChan, + connChan: internalSupplierSChan, + cfg: cfg, + dm: dm, + cacheS: cacheS, + filterSChan: filterSChan, + server: server, + connMgr: connMgr, } } // SupplierService implements Service interface type SupplierService struct { sync.RWMutex - cfg *config.CGRConfig - dm *DataDBService - cacheS *engine.CacheS - filterSChan chan *engine.FilterS - server *utils.Server - attrsChan chan rpcclient.ClientConnector - stsChan chan rpcclient.ClientConnector - resChan chan rpcclient.ClientConnector - dispatcherChan chan rpcclient.ClientConnector + cfg *config.CGRConfig + dm *DataDBService + cacheS *engine.CacheS + filterSChan chan *engine.FilterS + server *utils.Server + connMgr *engine.ConnManager splS *engine.SupplierService rpc *v1.SupplierSv1 @@ -79,30 +73,10 @@ func (splS *SupplierService) Start() (err error) { filterS := <-splS.filterSChan splS.filterSChan <- filterS - var attrSConn, resourceSConn, statSConn rpcclient.ClientConnector - - attrSConn, err = NewConnection(splS.cfg, splS.attrsChan, splS.dispatcherChan, splS.cfg.SupplierSCfg().AttributeSConns) - if err != nil { - utils.Logger.Crit(fmt.Sprintf("<%s> Could not connect to %s: %s", - utils.SupplierS, utils.SupplierS, err.Error())) - return - } - statSConn, err = NewConnection(splS.cfg, splS.stsChan, splS.dispatcherChan, splS.cfg.SupplierSCfg().StatSConns) - if err != nil { - utils.Logger.Crit(fmt.Sprintf("<%s> Could not connect to StatS: %s", - utils.SupplierS, err.Error())) - return - } - resourceSConn, err = NewConnection(splS.cfg, splS.resChan, splS.dispatcherChan, splS.cfg.SupplierSCfg().ResourceSConns) - if err != nil { - utils.Logger.Crit(fmt.Sprintf("<%s> Could not connect to StatS: %s", - utils.SupplierS, err.Error())) - return - } splS.Lock() defer splS.Unlock() splS.splS, err = engine.NewSupplierService(splS.dm.GetDM(), filterS, splS.cfg, - resourceSConn, statSConn, attrSConn) + splS.connMgr) if err != nil { utils.Logger.Crit(fmt.Sprintf("<%s> Could not init, error: %s", utils.SupplierS, err.Error())) @@ -125,30 +99,6 @@ func (splS *SupplierService) GetIntenternalChan() (conn chan rpcclient.ClientCon // Reload handles the change of config func (splS *SupplierService) Reload() (err error) { - var attrSConn, resourceSConn, statSConn rpcclient.ClientConnector - attrSConn, err = NewConnection(splS.cfg, splS.attrsChan, splS.dispatcherChan, splS.cfg.SupplierSCfg().AttributeSConns) - if err != nil { - utils.Logger.Crit(fmt.Sprintf("<%s> Could not connect to %s: %s", - utils.SupplierS, utils.SupplierS, err.Error())) - return - } - statSConn, err = NewConnection(splS.cfg, splS.stsChan, splS.dispatcherChan, splS.cfg.SupplierSCfg().StatSConns) - if err != nil { - utils.Logger.Crit(fmt.Sprintf("<%s> Could not connect to StatS: %s", - utils.SupplierS, err.Error())) - return - } - resourceSConn, err = NewConnection(splS.cfg, splS.resChan, splS.dispatcherChan, splS.cfg.SupplierSCfg().ResourceSConns) - if err != nil { - utils.Logger.Crit(fmt.Sprintf("<%s> Could not connect to StatS: %s", - utils.SupplierS, err.Error())) - return - } - splS.Lock() - splS.splS.SetAttributeSConnection(attrSConn) - splS.splS.SetStatSConnection(statSConn) - splS.splS.SetResourceSConnection(resourceSConn) - splS.Unlock() return } diff --git a/services/suppliers_it_test.go b/services/suppliers_it_test.go index 3fa015caa..dc6f2fc3c 100644 --- a/services/suppliers_it_test.go +++ b/services/suppliers_it_test.go @@ -53,7 +53,7 @@ func TestSupplierSReload(t *testing.T) { srvMngr := servmanager.NewServiceManager(cfg, engineShutdown) db := NewDataDBService(cfg) sts := NewStatService(cfg, db, chS, filterSChan, server, make(chan rpcclient.RpcClientConnection, 1), nil) - supS := NewSupplierService(cfg, db, chS, filterSChan, server, nil, sts.GetIntenternalChan(), nil, nil) + supS := NewSupplierService(cfg, db, chS, filterSChan, server, make(chan rpcclient.RpcClientConnection, 1), nil) srvMngr.AddServices(NewConnManagerService(cfg, nil), supS, sts, NewLoaderService(cfg, db, filterSChan, server, nil, nil, engineShutdown), db) if err = srvMngr.StartServices(); err != nil { t.Error(err)