From 038aa5f2ea3d82ee61963098bfdc884fe0f3d590 Mon Sep 17 00:00:00 2001 From: Trial97 Date: Fri, 26 Feb 2021 16:24:38 +0200 Subject: [PATCH] Renamed DispatcherH in RegistarC --- cmd/cgr-engine/cgr-engine.go | 8 +- config/config.go | 61 +-- config/config_defaults.go | 2 +- config/config_json.go | 10 +- config/config_test.go | 36 +- config/configsanity.go | 65 +++- config/configsanity_test.go | 111 +++++- config/dispatcherhcfg.go | 145 -------- config/dispatcherhcfg_test.go | 269 -------------- config/httpcfg.go | 16 +- config/httpcfg_test.go | 26 +- config/libconfig_json.go | 20 +- config/registrarccfg.go | 138 +++++++ config/registrarccfg_test.go | 346 ++++++++++++++++++ config/rpcconn.go | 41 +++ .../dispatcherh/all2_mysql/cgrates.json | 22 +- engine/dispatcherprfl.go | 7 +- engine/libengine.go | 3 + engine/model_helpers.go | 10 +- engine/model_helpers_test.go | 12 +- integration_test.sh | 20 +- packages/debian/changelog | 1 + {dispatcherh => registrarc}/dispatcherh.go | 28 +- .../dispatcherh_it_test.go | 2 +- .../dispatcherh_test.go | 10 +- {dispatcherh => registrarc}/lib_test.go | 2 +- {dispatcherh => registrarc}/libdispatcherh.go | 101 +++-- .../libdispatcherh_test.go | 36 +- services/{dispatcherh.go => registrarc.go} | 24 +- ...cherh_it_test.go => registrarc_it_test.go} | 6 +- ...dispatcherh_test.go => registrarc_test.go} | 8 +- servmanager/servmanager.go | 4 +- utils/consts.go | 43 ++- utils/net.go | 2 +- 34 files changed, 966 insertions(+), 669 deletions(-) delete mode 100644 config/dispatcherhcfg.go delete mode 100644 config/dispatcherhcfg_test.go create mode 100644 config/registrarccfg.go create mode 100644 config/registrarccfg_test.go rename {dispatcherh => registrarc}/dispatcherh.go (73%) rename {dispatcherh => registrarc}/dispatcherh_it_test.go (99%) rename {dispatcherh => registrarc}/dispatcherh_test.go (91%) rename {dispatcherh => registrarc}/lib_test.go (98%) rename {dispatcherh => registrarc}/libdispatcherh.go (67%) rename {dispatcherh => registrarc}/libdispatcherh_test.go (92%) rename services/{dispatcherh.go => registrarc.go} (78%) rename services/{dispatcherh_it_test.go => registrarc_it_test.go} (95%) rename services/{dispatcherh_test.go => registrarc_test.go} (91%) diff --git a/cmd/cgr-engine/cgr-engine.go b/cmd/cgr-engine/cgr-engine.go index 684cc7956..87080852f 100644 --- a/cmd/cgr-engine/cgr-engine.go +++ b/cmd/cgr-engine/cgr-engine.go @@ -35,8 +35,8 @@ import ( "time" "github.com/cgrates/cgrates/cores" - "github.com/cgrates/cgrates/dispatcherh" "github.com/cgrates/cgrates/loaders" + "github.com/cgrates/cgrates/registrarc" v1 "github.com/cgrates/cgrates/apier/v1" "github.com/cgrates/cgrates/config" @@ -543,7 +543,7 @@ func main() { utils.CoreS: new(sync.WaitGroup), utils.DataDB: new(sync.WaitGroup), utils.DiameterAgent: new(sync.WaitGroup), - utils.DispatcherH: new(sync.WaitGroup), + utils.RegistrarC: new(sync.WaitGroup), utils.DispatcherS: new(sync.WaitGroup), utils.DNSAgent: new(sync.WaitGroup), utils.EventExporterS: new(sync.WaitGroup), @@ -592,7 +592,7 @@ func main() { // Rpc/http server server := cores.NewServer(caps) if len(cfg.HTTPCfg().DispatchersRegistrarURL) != 0 { - server.RegisterHttpFunc(cfg.HTTPCfg().DispatchersRegistrarURL, dispatcherh.Registar) + server.RegisterHttpFunc(cfg.HTTPCfg().DispatchersRegistrarURL, registrarc.Registrar) } if cfg.ConfigSCfg().Enabled { server.RegisterHttpFunc(cfg.ConfigSCfg().URL, config.HandlerConfigS) @@ -633,7 +633,7 @@ func main() { srvManager := servmanager.NewServiceManager(cfg, shdChan, shdWg) attrS := services.NewAttributeService(cfg, dmService, cacheS, filterSChan, server, internalAttributeSChan, anz, srvDep) dspS := services.NewDispatcherService(cfg, dmService, cacheS, filterSChan, server, internalDispatcherSChan, connManager, anz, srvDep) - dspH := services.NewDispatcherHostsService(cfg, server, connManager, anz, srvDep) + dspH := services.NewRegistrarCService(cfg, server, connManager, anz, srvDep) chrS := services.NewChargerService(cfg, dmService, cacheS, filterSChan, server, internalChargerSChan, connManager, anz, srvDep) tS := services.NewThresholdService(cfg, dmService, cacheS, filterSChan, server, internalThresholdSChan, anz, srvDep) diff --git a/config/config.go b/config/config.go index e397319c6..3b536970a 100644 --- a/config/config.go +++ b/config/config.go @@ -182,8 +182,11 @@ func newCGRConfig(config []byte) (cfg *CGRConfig, err error) { cfg.routeSCfg = new(RouteSCfg) cfg.sureTaxCfg = new(SureTaxCfg) cfg.dispatcherSCfg = new(DispatcherSCfg) - cfg.dispatcherHCfg = new(DispatcherHCfg) - cfg.dispatcherHCfg.Hosts = make(map[string][]*DispatcherHRegistarCfg) + cfg.registrarCCfg = new(RegistrarCCfgs) + cfg.registrarCCfg.RPC = new(RegistrarCCfg) + cfg.registrarCCfg.Dispatcher = new(RegistrarCCfg) + cfg.registrarCCfg.RPC.Hosts = make(map[string][]*RemoteHost) + cfg.registrarCCfg.Dispatcher.Hosts = make(map[string][]*RemoteHost) cfg.loaderCgrCfg = new(LoaderCgrCfg) cfg.migratorCgrCfg = new(MigratorCgrCfg) cfg.migratorCgrCfg.OutDataDBOpts = make(map[string]interface{}) @@ -322,7 +325,7 @@ type CGRConfig struct { routeSCfg *RouteSCfg // RouteS config sureTaxCfg *SureTaxCfg // SureTax config dispatcherSCfg *DispatcherSCfg // DispatcherS config - dispatcherHCfg *DispatcherHCfg // DispatcherH config + registrarCCfg *RegistrarCCfgs // RegistrarC config loaderCgrCfg *LoaderCgrCfg // LoaderCgr config migratorCgrCfg *MigratorCgrCfg // MigratorCgr config mailerCfg *MailerCfg // Mailer config @@ -406,7 +409,7 @@ func (cfg *CGRConfig) loadFromJSONCfg(jsnCfg *CgrJsonCfg) (err error) { cfg.loadMailerCfg, cfg.loadSureTaxCfg, cfg.loadDispatcherSCfg, cfg.loadLoaderCgrCfg, cfg.loadMigratorCgrCfg, cfg.loadTLSCgrCfg, cfg.loadAnalyzerCgrCfg, cfg.loadApierCfg, cfg.loadErsCfg, cfg.loadEesCfg, - cfg.loadRateSCfg, cfg.loadSIPAgentCfg, cfg.loadDispatcherHCfg, + cfg.loadRateSCfg, cfg.loadSIPAgentCfg, cfg.loadRegistrarCCfg, cfg.loadConfigSCfg, cfg.loadAPIBanCgrCfg, cfg.loadCoreSCfg, cfg.loadActionSCfg, cfg.loadAccountSCfg} { if err = loadFunc(jsnCfg); err != nil { @@ -710,13 +713,13 @@ func (cfg *CGRConfig) loadDispatcherSCfg(jsnCfg *CgrJsonCfg) (err error) { return cfg.dispatcherSCfg.loadFromJSONCfg(jsnDispatcherSCfg) } -// loadDispatcherHCfg loads the DispatcherH section of the configuration -func (cfg *CGRConfig) loadDispatcherHCfg(jsnCfg *CgrJsonCfg) (err error) { - var jsnDispatcherHCfg *DispatcherHJsonCfg - if jsnDispatcherHCfg, err = jsnCfg.DispatcherHJsonCfg(); err != nil { +// loadRegistrarCCfg loads the DispatcherH section of the configuration +func (cfg *CGRConfig) loadRegistrarCCfg(jsnCfg *CgrJsonCfg) (err error) { + var jsnDispatcherHCfg *RegistrarCJsonCfgs + if jsnDispatcherHCfg, err = jsnCfg.RegistrarCJsonCfgs(); err != nil { return } - return cfg.dispatcherHCfg.loadFromJSONCfg(jsnDispatcherHCfg) + return cfg.registrarCCfg.loadFromJSONCfg(jsnDispatcherHCfg) } // loadLoaderCgrCfg loads the Loader section of the configuration @@ -1000,11 +1003,11 @@ func (cfg *CGRConfig) DispatcherSCfg() *DispatcherSCfg { return cfg.dispatcherSCfg } -// DispatcherHCfg returns the config for DispatcherH -func (cfg *CGRConfig) DispatcherHCfg() *DispatcherHCfg { +// RegistrarCCfg returns the config for RegistrarC +func (cfg *CGRConfig) RegistrarCCfg() *RegistrarCCfgs { cfg.lks[DispatcherSJson].Lock() defer cfg.lks[DispatcherSJson].Unlock() - return cfg.dispatcherHCfg + return cfg.registrarCCfg } // MigratorCgrCfg returns the config for Migrator @@ -1231,6 +1234,22 @@ func (cfg *CGRConfig) RUnlocks(lkIDs ...string) { } } +// LockSections will lock the given sections +// User needs to know what he is doing since this can panic +func (cfg *CGRConfig) LockSections(lkIDs ...string) { + for _, lkID := range lkIDs { + cfg.lks[lkID].Lock() + } +} + +// UnlockSections will unlock the given sections +// User needs to know what he is doing since this can panic +func (cfg *CGRConfig) UnlockSections(lkIDs ...string) { + for _, lkID := range lkIDs { + cfg.lks[lkID].Unlock() + } +} + func (cfg *CGRConfig) getLoadFunctions() map[string]func(*CgrJsonCfg) error { return map[string]func(*CgrJsonCfg) error{ GENERAL_JSN: cfg.loadGeneralCfg, @@ -1266,7 +1285,7 @@ func (cfg *CGRConfig) getLoadFunctions() map[string]func(*CgrJsonCfg) error { CgrLoaderCfgJson: cfg.loadLoaderCgrCfg, CgrMigratorCfgJson: cfg.loadMigratorCgrCfg, DispatcherSJson: cfg.loadDispatcherSCfg, - DispatcherHJson: cfg.loadDispatcherHCfg, + RegistrarCJson: cfg.loadRegistrarCCfg, AnalyzerCfgJson: cfg.loadAnalyzerCgrCfg, ApierS: cfg.loadApierCfg, RPCConnsJsonName: cfg.loadRPCConns, @@ -1529,8 +1548,8 @@ func (cfg *CGRConfig) reloadSections(sections ...string) { cfg.rldChans[SIPAgentJson] <- struct{}{} case RateSJson: cfg.rldChans[RateSJson] <- struct{}{} - case DispatcherHJson: - cfg.rldChans[DispatcherHJson] <- struct{}{} + case RegistrarCJson: + cfg.rldChans[RegistrarCJson] <- struct{}{} case AccountSCfgJson: cfg.rldChans[AccountSCfgJson] <- struct{}{} case ActionSJson: @@ -1572,7 +1591,7 @@ func (cfg *CGRConfig) AsMapInterface(separator string) (mp map[string]interface{ RouteSJson: cfg.routeSCfg.AsMapInterface(), SURETAX_JSON: cfg.sureTaxCfg.AsMapInterface(separator), DispatcherSJson: cfg.dispatcherSCfg.AsMapInterface(), - DispatcherHJson: cfg.dispatcherHCfg.AsMapInterface(), + RegistrarCJson: cfg.registrarCCfg.AsMapInterface(), CgrLoaderCfgJson: cfg.loaderCgrCfg.AsMapInterface(), CgrMigratorCfgJson: cfg.migratorCgrCfg.AsMapInterface(), MAILER_JSN: cfg.mailerCfg.AsMapInterface(), @@ -1716,8 +1735,8 @@ func (cfg *CGRConfig) V1GetConfig(args *SectionWithOpts, reply *map[string]inter mp = cfg.SureTaxCfg().AsMapInterface(cfg.GeneralCfg().RSRSep) case DispatcherSJson: mp = cfg.DispatcherSCfg().AsMapInterface() - case DispatcherHJson: - mp = cfg.DispatcherHCfg().AsMapInterface() + case RegistrarCJson: + mp = cfg.RegistrarCCfg().AsMapInterface() case LoaderJson: mp = cfg.LoaderCfg().AsMapInterface(cfg.GeneralCfg().RSRSep) case CgrLoaderCfgJson: @@ -1886,8 +1905,8 @@ func (cfg *CGRConfig) V1GetConfigAsJSON(args *SectionWithOpts, reply *string) (e mp = cfg.SureTaxCfg().AsMapInterface(cfg.GeneralCfg().RSRSep) case DispatcherSJson: mp = cfg.DispatcherSCfg().AsMapInterface() - case DispatcherHJson: - mp = cfg.DispatcherHCfg().AsMapInterface() + case RegistrarCJson: + mp = cfg.RegistrarCCfg().AsMapInterface() case LoaderJson: mp = cfg.LoaderCfg().AsMapInterface(cfg.GeneralCfg().RSRSep) case CgrLoaderCfgJson: @@ -2006,7 +2025,7 @@ func (cfg *CGRConfig) Clone() (cln *CGRConfig) { routeSCfg: cfg.routeSCfg.Clone(), sureTaxCfg: cfg.sureTaxCfg.Clone(), dispatcherSCfg: cfg.dispatcherSCfg.Clone(), - dispatcherHCfg: cfg.dispatcherHCfg.Clone(), + registrarCCfg: cfg.registrarCCfg.Clone(), loaderCgrCfg: cfg.loaderCgrCfg.Clone(), migratorCgrCfg: cfg.migratorCgrCfg.Clone(), mailerCfg: cfg.mailerCfg.Clone(), diff --git a/config/config_defaults.go b/config/config_defaults.go index ca322ac2b..b958a5941 100644 --- a/config/config_defaults.go +++ b/config/config_defaults.go @@ -66,7 +66,7 @@ const CGRATES_CFG_JSON = ` "rpc_conns": { "*localhost": { - "conns": [{"address": "127.0.0.1:2012", "transport":"*json"}], + "conns": [{"address": "127.0.0.1:2012", "transport":"*json"},{}], }, }, // rpc connections definitions diff --git a/config/config_json.go b/config/config_json.go index 67db797df..7610236bd 100644 --- a/config/config_json.go +++ b/config/config_json.go @@ -49,7 +49,7 @@ const ( MAILER_JSN = "mailer" SURETAX_JSON = "suretax" DispatcherSJson = "dispatchers" - DispatcherHJson = "dispatcherh" + RegistrarCJson = "registrarc" CgrLoaderCfgJson = "loader" CgrMigratorCfgJson = "migrator" ChargerSCfgJson = "chargers" @@ -75,7 +75,7 @@ var ( CACHE_JSN, FilterSjsn, RALS_JSN, CDRS_JSN, ERsJson, SessionSJson, AsteriskAgentJSN, FreeSWITCHAgentJSN, KamailioAgentJSN, DA_JSN, RA_JSN, HttpAgentJson, DNSAgentJson, ATTRIBUTE_JSN, ChargerSCfgJson, RESOURCES_JSON, STATS_JSON, THRESHOLDS_JSON, RouteSJson, LoaderJson, MAILER_JSN, SURETAX_JSON, CgrLoaderCfgJson, CgrMigratorCfgJson, DispatcherSJson, - AnalyzerCfgJson, ApierS, EEsJson, RateSJson, SIPAgentJson, DispatcherHJson, TemplatesJson, ConfigSJson, APIBanCfgJson, CoreSCfgJson, + AnalyzerCfgJson, ApierS, EEsJson, RateSJson, SIPAgentJson, RegistrarCJson, TemplatesJson, ConfigSJson, APIBanCfgJson, CoreSCfgJson, ActionSJson, AccountSCfgJson} ) @@ -443,12 +443,12 @@ func (self CgrJsonCfg) DispatcherSJsonCfg() (*DispatcherSJsonCfg, error) { return cfg, nil } -func (self CgrJsonCfg) DispatcherHJsonCfg() (*DispatcherHJsonCfg, error) { - rawCfg, hasKey := self[DispatcherHJson] +func (self CgrJsonCfg) RegistrarCJsonCfgs() (*RegistrarCJsonCfgs, error) { + rawCfg, hasKey := self[RegistrarCJson] if !hasKey { return nil, nil } - cfg := new(DispatcherHJsonCfg) + cfg := new(RegistrarCJsonCfgs) if err := json.Unmarshal(*rawCfg, cfg); err != nil { return nil, err } diff --git a/config/config_test.go b/config/config_test.go index 4190bebf6..1f2e3e5c6 100644 --- a/config/config_test.go +++ b/config/config_test.go @@ -1659,7 +1659,7 @@ func TestLoadDispatcherHCfgError(t *testing.T) { } if cgrCfgJSON, err := NewCgrJsonCfgFromBytes([]byte(cfgJSONStr)); err != nil { t.Error(err) - } else if err := cgrConfig.loadDispatcherHCfg(cgrCfgJSON); err == nil || err.Error() != expected { + } else if err := cgrConfig.loadRegistrarCCfg(cgrCfgJSON); err == nil || err.Error() != expected { t.Errorf("Expected %+v, received %+v", expected, err) } } @@ -4045,13 +4045,13 @@ func TestV1GetConfigHTTP(t *testing.T) { var reply map[string]interface{} expected := map[string]interface{}{ HTTP_JSN: map[string]interface{}{ - utils.HTTPJsonRPCURLCfg: "/jsonrpc", - utils.DispatchersRegistrarURLCfg: "/dispatchers_registrar", - utils.HTTPWSURLCfg: "/ws", - utils.HTTPFreeswitchCDRsURLCfg: "/freeswitch_json", - utils.HTTPCDRsURLCfg: "/cdr_http", - utils.HTTPUseBasicAuthCfg: false, - utils.HTTPAuthUsersCfg: map[string]string{}, + utils.HTTPJsonRPCURLCfg: "/jsonrpc", + utils.RegistrarSURLCfg: "/dispatchers_registrar", + utils.HTTPWSURLCfg: "/ws", + utils.HTTPFreeswitchCDRsURLCfg: "/freeswitch_json", + utils.HTTPCDRsURLCfg: "/cdr_http", + utils.HTTPUseBasicAuthCfg: false, + utils.HTTPAuthUsersCfg: map[string]string{}, utils.HTTPClientOptsCfg: map[string]interface{}{ utils.HTTPClientTLSClientConfigCfg: false, utils.HTTPClientTLSHandshakeTimeoutCfg: "10s", @@ -4601,15 +4601,15 @@ func TestV1GetConfigDispatcherS(t *testing.T) { func TestV1GetConfigDispatcherH(t *testing.T) { var reply map[string]interface{} expected := map[string]interface{}{ - DispatcherHJson: map[string]interface{}{ - utils.EnabledCfg: false, - utils.DispatchersConnsCfg: []string{}, - utils.HostsCfg: map[string][]map[string]interface{}{}, - utils.RegisterIntervalCfg: "5m0s", + RegistrarCJson: map[string]interface{}{ + utils.EnabledCfg: false, + utils.RegistrarsConnsCfg: []string{}, + utils.HostsCfg: map[string][]map[string]interface{}{}, + utils.RefreshIntervalCfg: "5m0s", }, } cfgCgr := NewDefaultCGRConfig() - if err := cfgCgr.V1GetConfig(&SectionWithOpts{Section: DispatcherHJson}, &reply); err != nil { + if err := cfgCgr.V1GetConfig(&SectionWithOpts{Section: RegistrarCJson}, &reply); err != nil { t.Error(err) } else if !reflect.DeepEqual(reply, expected) { t.Errorf("Expected %+v \n, received %+v", utils.ToJSON(expected), utils.ToJSON(reply)) @@ -5393,7 +5393,7 @@ func TestV1GetConfigAsJSONDispatcherH(t *testing.T) { var reply string expected := `{"dispatcherh":{"dispatchers_conns":[],"enabled":false,"hosts":{},"register_interval":"5m0s"}}` cgrCfg := NewDefaultCGRConfig() - if err := cgrCfg.V1GetConfigAsJSON(&SectionWithOpts{Section: DispatcherHJson}, &reply); err != nil { + if err := cgrCfg.V1GetConfigAsJSON(&SectionWithOpts{Section: RegistrarCJson}, &reply); err != nil { t.Error(err) } else if expected != reply { t.Errorf("Expected %+v \n, received %+v", expected, reply) @@ -5916,7 +5916,7 @@ func TestReloadSections(t *testing.T) { for _, section := range []string{RPCConnsJsonName, HTTP_JSN, SCHEDULER_JSN, RALS_JSN, CDRS_JSN, ERsJson, SessionSJson, AsteriskAgentJSN, FreeSWITCHAgentJSN, KamailioAgentJSN, DA_JSN, RA_JSN, HttpAgentJson, DNSAgentJson, ATTRIBUTE_JSN, ChargerSCfgJson, RESOURCES_JSON, STATS_JSON, THRESHOLDS_JSON, RouteSJson, - LoaderJson, DispatcherSJson, ApierS, EEsJson, SIPAgentJson, RateSJson, DispatcherHJson, AnalyzerCfgJson} { + LoaderJson, DispatcherSJson, ApierS, EEsJson, SIPAgentJson, RateSJson, RegistrarCJson, AnalyzerCfgJson} { for _, section := range sortedCfgSections { cfgCgr.rldChans[section] = make(chan struct{}, 1) } @@ -6123,8 +6123,8 @@ func TestCGRConfigClone(t *testing.T) { if !reflect.DeepEqual(cfg.dispatcherSCfg, rcv.dispatcherSCfg) { t.Errorf("Expected: %+v\nReceived: %+v", utils.ToJSON(cfg.dispatcherSCfg), utils.ToJSON(rcv.dispatcherSCfg)) } - if !reflect.DeepEqual(cfg.dispatcherHCfg, rcv.dispatcherHCfg) { - t.Errorf("Expected: %+v\nReceived: %+v", utils.ToJSON(cfg.dispatcherHCfg), utils.ToJSON(rcv.dispatcherHCfg)) + if !reflect.DeepEqual(cfg.registrarCCfg, rcv.registrarCCfg) { + t.Errorf("Expected: %+v\nReceived: %+v", utils.ToJSON(cfg.registrarCCfg), utils.ToJSON(rcv.registrarCCfg)) } if !reflect.DeepEqual(cfg.loaderCgrCfg, rcv.loaderCgrCfg) { t.Errorf("Expected: %+v\nReceived: %+v", utils.ToJSON(cfg.loaderCgrCfg), utils.ToJSON(rcv.loaderCgrCfg)) diff --git a/config/configsanity.go b/config/configsanity.go index 3886f3d87..d8234af5c 100644 --- a/config/configsanity.go +++ b/config/configsanity.go @@ -718,39 +718,74 @@ func (cfg *CGRConfig) checkConfigSanity() error { } } - if cfg.dispatcherHCfg.Enabled { - if len(cfg.dispatcherHCfg.Hosts) == 0 { - return fmt.Errorf("<%s> missing dispatcher host IDs", utils.DispatcherH) + if cfg.registrarCCfg.Dispatcher.Enabled { + if len(cfg.registrarCCfg.Dispatcher.Hosts) == 0 { + return fmt.Errorf("<%s> missing dispatcher host IDs", utils.RegistrarC) } - if cfg.dispatcherHCfg.RegisterInterval <= 0 { - return fmt.Errorf("<%s> the register imterval needs to be bigger than 0", utils.DispatcherH) + if cfg.registrarCCfg.Dispatcher.RefreshInterval <= 0 { + return fmt.Errorf("<%s> the register imterval needs to be bigger than 0", utils.RegistrarC) } - for tnt, hosts := range cfg.dispatcherHCfg.Hosts { + for tnt, hosts := range cfg.registrarCCfg.Dispatcher.Hosts { for _, host := range hosts { - if !utils.SliceHasMember([]string{utils.MetaGOB, rpcclient.HTTPjson, utils.MetaJSON}, host.RegisterTransport) { - return fmt.Errorf("<%s> unsupported transport <%s> for host <%s>", utils.DispatcherH, host.RegisterTransport, utils.ConcatenatedKey(tnt, host.ID)) + if !utils.SliceHasMember([]string{utils.MetaGOB, rpcclient.HTTPjson, utils.MetaJSON, rpcclient.BiRPCJSON, rpcclient.BiRPCGOB}, host.Transport) { + return fmt.Errorf("<%s> unsupported transport <%s> for host <%s>", utils.RegistrarC, host.Transport, utils.ConcatenatedKey(tnt, host.ID)) } } } - if len(cfg.dispatcherHCfg.DispatchersConns) == 0 { - return fmt.Errorf("<%s> missing dispatcher connection IDs", utils.DispatcherH) + if len(cfg.registrarCCfg.Dispatcher.RegistrarSConns) == 0 { + return fmt.Errorf("<%s> missing dispatcher connection IDs", utils.RegistrarC) } - for _, connID := range cfg.dispatcherHCfg.DispatchersConns { + for _, connID := range cfg.registrarCCfg.Dispatcher.RegistrarSConns { if connID == utils.MetaInternal { - return fmt.Errorf("<%s> internal connection IDs are not supported", utils.DispatcherH) + return fmt.Errorf("<%s> internal connection IDs are not supported", utils.RegistrarC) } connCfg, has := cfg.rpcConns[connID] if !has { - return fmt.Errorf("<%s> connection with id: <%s> not defined", utils.DispatcherH, connID) + return fmt.Errorf("<%s> connection with id: <%s> not defined", utils.RegistrarC, connID) } if len(connCfg.Conns) != 1 { - return fmt.Errorf("<%s> connection with id: <%s> needs to have only one host", utils.DispatcherH, connID) + return fmt.Errorf("<%s> connection with id: <%s> needs to have only one host", utils.RegistrarC, connID) } if connCfg.Conns[0].Transport != rpcclient.HTTPjson { - return fmt.Errorf("<%s> connection with id: <%s> unsupported transport <%s>", utils.DispatcherH, connID, connCfg.Conns[0].Transport) + return fmt.Errorf("<%s> connection with id: <%s> unsupported transport <%s>", utils.RegistrarC, connID, connCfg.Conns[0].Transport) } } } + + if cfg.registrarCCfg.RPC.Enabled { + if len(cfg.registrarCCfg.RPC.Hosts) == 0 { + return fmt.Errorf("<%s> missing RPC host IDs", utils.RegistrarC) + } + if cfg.registrarCCfg.RPC.RefreshInterval <= 0 { + return fmt.Errorf("<%s> the register imterval needs to be bigger than 0", utils.RegistrarC) + } + for tnt, hosts := range cfg.registrarCCfg.RPC.Hosts { + for _, host := range hosts { + if !utils.SliceHasMember([]string{utils.MetaGOB, rpcclient.HTTPjson, utils.MetaJSON, rpcclient.BiRPCJSON, rpcclient.BiRPCGOB}, host.Transport) { + return fmt.Errorf("<%s> unsupported transport <%s> for host <%s>", utils.RegistrarC, host.Transport, utils.ConcatenatedKey(tnt, host.ID)) + } + } + } + if len(cfg.registrarCCfg.RPC.RegistrarSConns) == 0 { + return fmt.Errorf("<%s> missing RPC connection IDs", utils.RegistrarC) + } + for _, connID := range cfg.registrarCCfg.RPC.RegistrarSConns { + if connID == utils.MetaInternal { + return fmt.Errorf("<%s> internal connection IDs are not supported", utils.RegistrarC) + } + connCfg, has := cfg.rpcConns[connID] + if !has { + return fmt.Errorf("<%s> connection with id: <%s> not defined", utils.RegistrarC, connID) + } + if len(connCfg.Conns) != 1 { + return fmt.Errorf("<%s> connection with id: <%s> needs to have only one host", utils.RegistrarC, connID) + } + if connCfg.Conns[0].Transport != rpcclient.HTTPjson { + return fmt.Errorf("<%s> connection with id: <%s> unsupported transport <%s>", utils.RegistrarC, connID, connCfg.Conns[0].Transport) + } + } + } + if cfg.analyzerSCfg.Enabled { if _, err := os.Stat(cfg.analyzerSCfg.DBPath); err != nil && os.IsNotExist(err) { return fmt.Errorf("<%s> nonexistent DB folder: %q", utils.AnalyzerS, cfg.analyzerSCfg.DBPath) diff --git a/config/configsanity_test.go b/config/configsanity_test.go index 36926dfc3..cdf1d6c72 100644 --- a/config/configsanity_test.go +++ b/config/configsanity_test.go @@ -1044,60 +1044,62 @@ func TestConfigSanityCache(t *testing.T) { } } -func TestConfigSanityDispatcherH(t *testing.T) { +func TestConfigSanityRegistrarCRPC(t *testing.T) { cfg := NewDefaultCGRConfig() - cfg.dispatcherHCfg = &DispatcherHCfg{ - Enabled: true, - Hosts: map[string][]*DispatcherHRegistarCfg{ - "hosts": {}, + cfg.registrarCCfg = &RegistrarCCfgs{ + RPC: &RegistrarCCfg{ + Enabled: true, + Hosts: map[string][]*RemoteHost{ + "hosts": {}, + }, }, } - expected := " the register imterval needs to be bigger than 0" + expected := " the register imterval needs to be bigger than 0" if err := cfg.CheckConfigSanity(); err == nil || err.Error() != expected { t.Errorf("Expecting: %+q received: %+q", expected, err) } - cfg.dispatcherHCfg.Hosts = nil - expected = " missing dispatcher host IDs" + cfg.registrarCCfg.RPC.Hosts = nil + expected = " missing RPC host IDs" if err := cfg.CheckConfigSanity(); err == nil || err.Error() != expected { t.Errorf("Expecting: %+q received: %+q", expected, err) } - cfg.dispatcherHCfg.RegisterInterval = 2 - cfg.dispatcherHCfg.Hosts = map[string][]*DispatcherHRegistarCfg{ + cfg.registrarCCfg.RPC.RefreshInterval = 2 + cfg.registrarCCfg.RPC.Hosts = map[string][]*RemoteHost{ "hosts": { { ID: "randomID", }, }, } - expected = " unsupported transport <> for host " + expected = " unsupported transport <> for host " if err := cfg.CheckConfigSanity(); err == nil || err.Error() != expected { t.Errorf("Expecting: %+q received: %+q", expected, err) } - cfg.dispatcherHCfg.Hosts["hosts"][0].RegisterTransport = utils.MetaJSON - expected = " missing dispatcher connection IDs" + cfg.registrarCCfg.RPC.Hosts["hosts"][0].Transport = utils.MetaJSON + expected = " missing RPC connection IDs" if err := cfg.CheckConfigSanity(); err == nil || err.Error() != expected { t.Errorf("Expecting: %+q received: %+q", expected, err) } - cfg.dispatcherHCfg.DispatchersConns = []string{utils.MetaInternal} - expected = " internal connection IDs are not supported" + cfg.registrarCCfg.RPC.RegistrarSConns = []string{utils.MetaInternal} + expected = " internal connection IDs are not supported" if err := cfg.CheckConfigSanity(); err == nil || err.Error() != expected { t.Errorf("Expecting: %+q received: %+q", expected, err) } - cfg.dispatcherHCfg.DispatchersConns = []string{utils.MetaLocalHost} - expected = " connection with id: <*localhost> unsupported transport <*json>" + cfg.registrarCCfg.RPC.RegistrarSConns = []string{utils.MetaLocalHost} + expected = " connection with id: <*localhost> unsupported transport <*json>" if err := cfg.CheckConfigSanity(); err == nil || err.Error() != expected { t.Errorf("Expecting: %+q received: %+q", expected, err) } - cfg.dispatcherHCfg.DispatchersConns = []string{"*conn1"} - expected = " connection with id: <*conn1> not defined" + cfg.registrarCCfg.RPC.RegistrarSConns = []string{"*conn1"} + expected = " connection with id: <*conn1> not defined" if err := cfg.CheckConfigSanity(); err == nil || err.Error() != expected { t.Errorf("Expecting: %+q received: %+q", expected, err) } @@ -1106,7 +1108,76 @@ func TestConfigSanityDispatcherH(t *testing.T) { utils.MetaLocalHost: {}, "*conn1": {}, } - expected = " connection with id: <*conn1> needs to have only one host" + expected = " connection with id: <*conn1> needs to have only one host" + if err := cfg.CheckConfigSanity(); err == nil || err.Error() != expected { + t.Errorf("Expecting: %+q received: %+q", expected, err) + } +} +func TestConfigSanityRegistrarCDispatcher(t *testing.T) { + cfg := NewDefaultCGRConfig() + + cfg.registrarCCfg = &RegistrarCCfgs{ + Dispatcher: &RegistrarCCfg{ + Enabled: true, + Hosts: map[string][]*RemoteHost{ + "hosts": {}, + }, + }, + } + + expected := " the register imterval needs to be bigger than 0" + if err := cfg.CheckConfigSanity(); err == nil || err.Error() != expected { + t.Errorf("Expecting: %+q received: %+q", expected, err) + } + + cfg.registrarCCfg.Dispatcher.Hosts = nil + expected = " missing dispatcher host IDs" + if err := cfg.CheckConfigSanity(); err == nil || err.Error() != expected { + t.Errorf("Expecting: %+q received: %+q", expected, err) + } + + cfg.registrarCCfg.Dispatcher.RefreshInterval = 2 + cfg.registrarCCfg.Dispatcher.Hosts = map[string][]*RemoteHost{ + "hosts": { + { + ID: "randomID", + }, + }, + } + expected = " unsupported transport <> for host " + if err := cfg.CheckConfigSanity(); err == nil || err.Error() != expected { + t.Errorf("Expecting: %+q received: %+q", expected, err) + } + + cfg.registrarCCfg.Dispatcher.Hosts["hosts"][0].Transport = utils.MetaJSON + expected = " missing dispatcher connection IDs" + if err := cfg.CheckConfigSanity(); err == nil || err.Error() != expected { + t.Errorf("Expecting: %+q received: %+q", expected, err) + } + + cfg.registrarCCfg.Dispatcher.RegistrarSConns = []string{utils.MetaInternal} + expected = " internal connection IDs are not supported" + if err := cfg.CheckConfigSanity(); err == nil || err.Error() != expected { + t.Errorf("Expecting: %+q received: %+q", expected, err) + } + + cfg.registrarCCfg.Dispatcher.RegistrarSConns = []string{utils.MetaLocalHost} + expected = " connection with id: <*localhost> unsupported transport <*json>" + if err := cfg.CheckConfigSanity(); err == nil || err.Error() != expected { + t.Errorf("Expecting: %+q received: %+q", expected, err) + } + + cfg.registrarCCfg.Dispatcher.RegistrarSConns = []string{"*conn1"} + expected = " connection with id: <*conn1> not defined" + if err := cfg.CheckConfigSanity(); err == nil || err.Error() != expected { + t.Errorf("Expecting: %+q received: %+q", expected, err) + } + + cfg.rpcConns = RPCConns{ + utils.MetaLocalHost: {}, + "*conn1": {}, + } + expected = " connection with id: <*conn1> needs to have only one host" if err := cfg.CheckConfigSanity(); err == nil || err.Error() != expected { t.Errorf("Expecting: %+q received: %+q", expected, err) } diff --git a/config/dispatcherhcfg.go b/config/dispatcherhcfg.go deleted file mode 100644 index eac9b0569..000000000 --- a/config/dispatcherhcfg.go +++ /dev/null @@ -1,145 +0,0 @@ -/* -Real-time Online/Offline Charging System (OCS) for Telecom & ISP environments -Copyright (C) ITsysCOM GmbH - -This program is free software: you can redistribute it and/or modify -it under the terms of the GNU General Public License as published by -the Free Software Foundation, either version 3 of the License, or -(at your option) any later version. - -This program is distributed in the hope that it will be useful, -but WITHOUT ANY WARRANTY; without even the implied warranty of -MERCHANTABILITY or FITNESS FOR A PARTICULAR PURPOSE. See the -GNU General Public License for more details. - -You should have received a copy of the GNU General Public License -along with this program. If not, see -*/ - -package config - -import ( - "time" - - "github.com/cgrates/cgrates/utils" -) - -// DispatcherHCfg is the configuration of dispatcher hosts -type DispatcherHCfg struct { - Enabled bool - DispatchersConns []string - Hosts map[string][]*DispatcherHRegistarCfg - RegisterInterval time.Duration -} - -func (dps *DispatcherHCfg) loadFromJSONCfg(jsnCfg *DispatcherHJsonCfg) (err error) { - if jsnCfg == nil { - return nil - } - if jsnCfg.Enabled != nil { - dps.Enabled = *jsnCfg.Enabled - } - if jsnCfg.Dispatchers_conns != nil { - dps.DispatchersConns = make([]string, len(*jsnCfg.Dispatchers_conns)) - copy(dps.DispatchersConns, *jsnCfg.Dispatchers_conns) - } - if jsnCfg.Hosts != nil { - for tnt, hosts := range jsnCfg.Hosts { - for _, hostJSON := range hosts { - dps.Hosts[tnt] = append(dps.Hosts[tnt], NewDispatcherHRegistarCfg(hostJSON)) - } - } - } - if jsnCfg.Register_interval != nil { - if dps.RegisterInterval, err = utils.ParseDurationWithNanosecs(*jsnCfg.Register_interval); err != nil { - return - } - } - return -} - -// AsMapInterface returns the config as a map[string]interface{} -func (dps *DispatcherHCfg) AsMapInterface() (initialMP map[string]interface{}) { - initialMP = map[string]interface{}{ - utils.EnabledCfg: dps.Enabled, - utils.DispatchersConnsCfg: dps.DispatchersConns, - utils.RegisterIntervalCfg: dps.RegisterInterval.String(), - } - if dps.RegisterInterval == 0 { - initialMP[utils.RegisterIntervalCfg] = "0" - } - if dps.Hosts != nil { - hosts := make(map[string][]map[string]interface{}) - for tnt, hs := range dps.Hosts { - for _, h := range hs { - hosts[tnt] = append(hosts[tnt], h.AsMapInterface()) - } - } - initialMP[utils.HostsCfg] = hosts - } - return -} - -// Clone returns a deep copy of DispatcherHCfg -func (dps DispatcherHCfg) Clone() (cln *DispatcherHCfg) { - cln = &DispatcherHCfg{ - Enabled: dps.Enabled, - RegisterInterval: dps.RegisterInterval, - Hosts: make(map[string][]*DispatcherHRegistarCfg), - } - if dps.DispatchersConns != nil { - cln.DispatchersConns = make([]string, len(dps.DispatchersConns)) - for i, k := range dps.DispatchersConns { - cln.DispatchersConns[i] = k - } - } - for tnt, hosts := range dps.Hosts { - clnH := make([]*DispatcherHRegistarCfg, len(hosts)) - for i, host := range hosts { - clnH[i] = host.Clone() - } - cln.Hosts[tnt] = clnH - } - return -} - -// DispatcherHRegistarCfg describe the DispatcherHost that will be registered -type DispatcherHRegistarCfg struct { - ID string - RegisterTransport string - RegisterTLS bool -} - -// NewDispatcherHRegistarCfg returns a new DispatcherHRegistarCfg based on the config from json -func NewDispatcherHRegistarCfg(jsnCfg DispatcherHRegistarJsonCfg) (dhr *DispatcherHRegistarCfg) { - dhr = new(DispatcherHRegistarCfg) - if jsnCfg.Id != nil { - dhr.ID = *jsnCfg.Id - } - dhr.RegisterTransport = utils.MetaJSON - if jsnCfg.Register_transport != nil { - dhr.RegisterTransport = *jsnCfg.Register_transport - } - if jsnCfg.Register_tls != nil { - dhr.RegisterTLS = *jsnCfg.Register_tls - } - return -} - -// AsMapInterface returns the config as a map[string]interface{} -func (dhr *DispatcherHRegistarCfg) AsMapInterface() map[string]interface{} { - return map[string]interface{}{ - utils.IDCfg: dhr.ID, - utils.RegisterTransportCfg: dhr.RegisterTransport, - utils.RegisterTLSCfg: dhr.RegisterTLS, - } -} - -// Clone returns a deep copy of DispatcherHRegistarCfg -func (dhr DispatcherHRegistarCfg) Clone() (cln *DispatcherHRegistarCfg) { - return &DispatcherHRegistarCfg{ - ID: dhr.ID, - RegisterTransport: dhr.RegisterTransport, - RegisterTLS: dhr.RegisterTLS, - } -} diff --git a/config/dispatcherhcfg_test.go b/config/dispatcherhcfg_test.go deleted file mode 100644 index 13fd84358..000000000 --- a/config/dispatcherhcfg_test.go +++ /dev/null @@ -1,269 +0,0 @@ -/* -Real-time Online/Offline Charging System (OCS) for Telecom & ISP environments -Copyright (C) ITsysCOM GmbH - -This program is free software: you can redistribute it and/or modify -it under the terms of the GNU General Public License as published by -the Free Software Foundation, either version 3 of the License, or -(at your option) any later version. - -This program is distributed in the hope that it will be useful, -but WITHOUT ANY WARRANTY; without even the implied warranty of -MERCHANTABILITY or FITNESS FOR A PARTICULAR PURPOSE. See the -GNU General Public License for more details. - -You should have received a copy of the GNU General Public License -along with this program. If not, see -*/ - -package config - -import ( - "reflect" - "testing" - - "github.com/cgrates/cgrates/utils" -) - -func TestDispatcherHCfgloadFromJsonCfg(t *testing.T) { - jsonCfg := &DispatcherHJsonCfg{ - Enabled: utils.BoolPointer(true), - Dispatchers_conns: &[]string{"*conn1", "*conn2"}, - Hosts: map[string][]DispatcherHRegistarJsonCfg{ - utils.MetaDefault: { - { - Id: utils.StringPointer("Host1"), - Register_transport: utils.StringPointer(utils.MetaJSON), - }, - { - Id: utils.StringPointer("Host2"), - Register_transport: utils.StringPointer(utils.MetaGOB), - }, - }, - "cgrates.net": { - { - Id: utils.StringPointer("Host1"), - Register_transport: utils.StringPointer(utils.MetaJSON), - Register_tls: utils.BoolPointer(true), - }, - { - Id: utils.StringPointer("Host2"), - Register_transport: utils.StringPointer(utils.MetaGOB), - Register_tls: utils.BoolPointer(true), - }, - }, - }, - Register_interval: utils.StringPointer("5"), - } - expected := &DispatcherHCfg{ - Enabled: true, - DispatchersConns: []string{"*conn1", "*conn2"}, - Hosts: map[string][]*DispatcherHRegistarCfg{ - utils.MetaDefault: { - { - ID: "Host1", - RegisterTransport: utils.MetaJSON, - }, - { - ID: "Host2", - RegisterTransport: utils.MetaGOB, - }, - }, - "cgrates.net": { - { - ID: "Host1", - RegisterTransport: utils.MetaJSON, - RegisterTLS: true, - }, - { - ID: "Host2", - RegisterTransport: utils.MetaGOB, - RegisterTLS: true, - }, - }, - }, - RegisterInterval: 5, - } - jsnCfg := NewDefaultCGRConfig() - if err = jsnCfg.dispatcherHCfg.loadFromJSONCfg(jsonCfg); err != nil { - t.Error(err) - } else if !reflect.DeepEqual(expected, jsnCfg.dispatcherHCfg) { - t.Errorf("Expected %+v \n, received %+v", utils.ToJSON(expected), utils.ToJSON(jsnCfg.dispatcherHCfg)) - } -} - -func TestDispatcherHCfgAsMapInterface(t *testing.T) { - cfgJSONStr := `{ - "dispatcherh":{ - "enabled": true, - "dispatchers_conns": ["*conn1","*conn2"], - "hosts": { - "*default": [ - { - "ID": "Host1", - "register_transport": "*json", - "register_tls": false - }, - { - "ID": "Host2", - "register_transport": "*gob", - "register_tls": false - } - ] - }, - "register_interval": "0", - }, -}` - eMap := map[string]interface{}{ - utils.EnabledCfg: true, - utils.DispatchersConnsCfg: []string{"*conn1", "*conn2"}, - utils.HostsCfg: map[string][]map[string]interface{}{ - utils.MetaDefault: { - { - utils.IDCfg: "Host1", - utils.RegisterTransportCfg: "*json", - utils.RegisterTLSCfg: false, - }, - { - utils.IDCfg: "Host2", - utils.RegisterTransportCfg: "*gob", - utils.RegisterTLSCfg: false, - }, - }, - }, - utils.RegisterIntervalCfg: "0", - } - if cgrCfg, err := NewCGRConfigFromJSONStringWithDefaults(cfgJSONStr); err != nil { - t.Error(err) - } else if rcv := cgrCfg.dispatcherHCfg.AsMapInterface(); !reflect.DeepEqual(eMap, rcv) { - t.Errorf("Expected %+v, received %+v", utils.ToJSON(eMap), utils.ToJSON(rcv)) - } -} - -func TestDispatcherCfgParseWithNanoSec(t *testing.T) { - jsonCfg := &DispatcherHJsonCfg{ - Register_interval: utils.StringPointer("1ss"), - } - expErrMessage := "time: unknown unit \"ss\" in duration \"1ss\"" - jsnCfg := NewDefaultCGRConfig() - if err = jsnCfg.dispatcherHCfg.loadFromJSONCfg(jsonCfg); err == nil || err.Error() != expErrMessage { - t.Errorf("Expected %+v \n, recevied %+v", expErrMessage, err) - } -} - -func TestDispatcherHCfgAsMapInterface1(t *testing.T) { - cfgJSONStr := `{ - "dispatcherh":{ - "enabled": true, - "dispatchers_conns":["conn1"], - "hosts": { - "*default": [ - { - "ID":"", - "register_transport": "*json", - "register_tls":false, - }, - { - "ID":"host2", - "register_transport": "", - "register_tls":true, - }, - ] - }, - "register_interval": "1m", - }, - -}` - eMap := map[string]interface{}{ - utils.EnabledCfg: true, - utils.DispatchersConnsCfg: []string{"conn1"}, - utils.HostsCfg: map[string][]map[string]interface{}{ - utils.MetaDefault: { - { - utils.IDCfg: utils.EmptyString, - utils.RegisterTransportCfg: utils.MetaJSON, - utils.RegisterTLSCfg: false, - }, - { - utils.IDCfg: "host2", - utils.RegisterTransportCfg: utils.EmptyString, - utils.RegisterTLSCfg: true, - }, - }, - }, - utils.RegisterIntervalCfg: "1m0s", - } - if cgrCfg, err := NewCGRConfigFromJSONStringWithDefaults(cfgJSONStr); err != nil { - t.Error(err) - } else { - rcv := cgrCfg.dispatcherHCfg.AsMapInterface() - if !reflect.DeepEqual(eMap[utils.HostsCfg].(map[string][]map[string]interface{})[utils.IDCfg], - rcv[utils.HostsCfg].(map[string][]map[string]interface{})[utils.IDCfg]) { - t.Errorf("Expected %+v, received %+v", eMap[utils.HostsCfg].(map[string][]map[string]interface{})[utils.IDCfg], - rcv[utils.HostsCfg].(map[string][]map[string]interface{})[utils.IDCfg]) - } else if !reflect.DeepEqual(eMap[utils.HostsCfg], rcv[utils.HostsCfg]) { - t.Errorf("Expected %+v, received %+v", eMap[utils.HostsCfg], rcv[utils.HostsCfg]) - } else if !reflect.DeepEqual(eMap, rcv) { - t.Errorf("Expected %+v, received %+v", eMap, rcv) - } - } -} - -func TestDispatcherHCfgAsMapInterface2(t *testing.T) { - cfgJSONStr := `{ - "dispatcherh": {}, -}` - eMap := map[string]interface{}{ - utils.EnabledCfg: false, - utils.DispatchersConnsCfg: []string{}, - utils.HostsCfg: map[string][]map[string]interface{}{}, - utils.RegisterIntervalCfg: "5m0s", - } - if cgrCfg, err := NewCGRConfigFromJSONStringWithDefaults(cfgJSONStr); err != nil { - t.Error(err) - } else if rcv := cgrCfg.dispatcherHCfg.AsMapInterface(); !reflect.DeepEqual(eMap, rcv) { - t.Errorf("Expected %+v, received %+v", eMap, rcv) - } -} - -func TestDispatcherHCfgClone(t *testing.T) { - ban := &DispatcherHCfg{ - Enabled: true, - DispatchersConns: []string{"*conn1", "*conn2"}, - Hosts: map[string][]*DispatcherHRegistarCfg{ - utils.MetaDefault: { - { - ID: "Host1", - RegisterTransport: utils.MetaJSON, - }, - { - ID: "Host2", - RegisterTransport: utils.MetaGOB, - }, - }, - "cgrates.net": { - { - ID: "Host1", - RegisterTransport: utils.MetaJSON, - RegisterTLS: true, - }, - { - ID: "Host2", - RegisterTransport: utils.MetaGOB, - RegisterTLS: true, - }, - }, - }, - RegisterInterval: 5, - } - rcv := ban.Clone() - if !reflect.DeepEqual(ban, rcv) { - t.Errorf("Expected: %+v\nReceived: %+v", utils.ToJSON(ban), utils.ToJSON(rcv)) - } - if rcv.DispatchersConns[0] = ""; ban.DispatchersConns[0] != "*conn1" { - t.Errorf("Expected clone to not modify the cloned") - } - if rcv.Hosts[utils.MetaDefault][0].ID = ""; ban.Hosts[utils.MetaDefault][0].ID != "Host1" { - t.Errorf("Expected clone to not modify the cloned") - } -} diff --git a/config/httpcfg.go b/config/httpcfg.go index f5b8e6e6c..25c101307 100644 --- a/config/httpcfg.go +++ b/config/httpcfg.go @@ -75,14 +75,14 @@ func (httpcfg *HTTPCfg) AsMapInterface() map[string]interface{} { clientOpts[k] = v } return map[string]interface{}{ - utils.HTTPJsonRPCURLCfg: httpcfg.HTTPJsonRPCURL, - utils.DispatchersRegistrarURLCfg: httpcfg.DispatchersRegistrarURL, - utils.HTTPWSURLCfg: httpcfg.HTTPWSURL, - utils.HTTPFreeswitchCDRsURLCfg: httpcfg.HTTPFreeswitchCDRsURL, - utils.HTTPCDRsURLCfg: httpcfg.HTTPCDRsURL, - utils.HTTPUseBasicAuthCfg: httpcfg.HTTPUseBasicAuth, - utils.HTTPAuthUsersCfg: httpcfg.HTTPAuthUsers, - utils.HTTPClientOptsCfg: clientOpts, + utils.HTTPJsonRPCURLCfg: httpcfg.HTTPJsonRPCURL, + utils.RegistrarSURLCfg: httpcfg.DispatchersRegistrarURL, + utils.HTTPWSURLCfg: httpcfg.HTTPWSURL, + utils.HTTPFreeswitchCDRsURLCfg: httpcfg.HTTPFreeswitchCDRsURL, + utils.HTTPCDRsURLCfg: httpcfg.HTTPCDRsURL, + utils.HTTPUseBasicAuthCfg: httpcfg.HTTPUseBasicAuth, + utils.HTTPAuthUsersCfg: httpcfg.HTTPAuthUsers, + utils.HTTPClientOptsCfg: clientOpts, } } diff --git a/config/httpcfg_test.go b/config/httpcfg_test.go index 5881bf8a8..cc9f7a60c 100644 --- a/config/httpcfg_test.go +++ b/config/httpcfg_test.go @@ -72,13 +72,13 @@ func TestHTTPCfgAsMapInterface(t *testing.T) { "http": {}, }` eMap := map[string]interface{}{ - utils.HTTPJsonRPCURLCfg: "/jsonrpc", - utils.DispatchersRegistrarURLCfg: "/dispatchers_registrar", - utils.HTTPWSURLCfg: "/ws", - utils.HTTPFreeswitchCDRsURLCfg: "/freeswitch_json", - utils.HTTPCDRsURLCfg: "/cdr_http", - utils.HTTPUseBasicAuthCfg: false, - utils.HTTPAuthUsersCfg: map[string]string{}, + utils.HTTPJsonRPCURLCfg: "/jsonrpc", + utils.RegistrarSURLCfg: "/dispatchers_registrar", + utils.HTTPWSURLCfg: "/ws", + utils.HTTPFreeswitchCDRsURLCfg: "/freeswitch_json", + utils.HTTPCDRsURLCfg: "/cdr_http", + utils.HTTPUseBasicAuthCfg: false, + utils.HTTPAuthUsersCfg: map[string]string{}, utils.HTTPClientOptsCfg: map[string]interface{}{ utils.HTTPClientTLSClientConfigCfg: false, utils.HTTPClientTLSHandshakeTimeoutCfg: "10s", @@ -113,12 +113,12 @@ func TestHTTPCfgAsMapInterface1(t *testing.T) { }, }` eMap := map[string]interface{}{ - utils.HTTPJsonRPCURLCfg: "/rpc", - utils.DispatchersRegistrarURLCfg: "/dispatchers_registrar", - utils.HTTPWSURLCfg: "", - utils.HTTPFreeswitchCDRsURLCfg: "/freeswitch_json", - utils.HTTPCDRsURLCfg: "/cdr_http", - utils.HTTPUseBasicAuthCfg: true, + utils.HTTPJsonRPCURLCfg: "/rpc", + utils.RegistrarSURLCfg: "/dispatchers_registrar", + utils.HTTPWSURLCfg: "", + utils.HTTPFreeswitchCDRsURLCfg: "/freeswitch_json", + utils.HTTPCDRsURLCfg: "/cdr_http", + utils.HTTPUseBasicAuthCfg: true, utils.HTTPAuthUsersCfg: map[string]string{ "user1": "authenticated", "user2": "authenticated", diff --git a/config/libconfig_json.go b/config/libconfig_json.go index c576bbcac..83c7821f1 100644 --- a/config/libconfig_json.go +++ b/config/libconfig_json.go @@ -270,6 +270,7 @@ type RPCConnsJson struct { // Represents one connection instance towards a rater/cdrs server type RemoteHostJson struct { + Id *string Address *string Transport *string Synchronous *bool @@ -534,19 +535,16 @@ type DispatcherSJsonCfg struct { Attributes_conns *[]string } -type DispatcherHJsonCfg struct { - Enabled *bool - Dispatchers_conns *[]string - Hosts map[string][]DispatcherHRegistarJsonCfg - Register_interval *string - Register_transport *string - Register_tls *bool +type RegistrarCJsonCfg struct { + Enabled *bool + Registrars_conns *[]string + Hosts map[string][]*RemoteHostJson + Refresh_interval *string } -type DispatcherHRegistarJsonCfg struct { - Id *string - Register_transport *string - Register_tls *bool +type RegistrarCJsonCfgs struct { + RPC *RegistrarCJsonCfg + Dispatcher *RegistrarCJsonCfg } type LoaderCfgJson struct { diff --git a/config/registrarccfg.go b/config/registrarccfg.go new file mode 100644 index 000000000..e28cbc560 --- /dev/null +++ b/config/registrarccfg.go @@ -0,0 +1,138 @@ +/* +Real-time Online/Offline Charging System (OCS) for Telecom & ISP environments +Copyright (C) ITsysCOM GmbH + +This program is free software: you can redistribute it and/or modify +it under the terms of the GNU General Public License as published by +the Free Software Foundation, either version 3 of the License, or +(at your option) any later version. + +This program is distributed in the hope that it will be useful, +but WITHOUT ANY WARRANTY; without even the implied warranty of +MERCHANTABILITY or FITNESS FOR A PARTICULAR PURPOSE. See the +GNU General Public License for more details. + +You should have received a copy of the GNU General Public License +along with this program. If not, see +*/ + +package config + +import ( + "time" + + "github.com/cgrates/cgrates/utils" +) + +// RegistrarCCfgs is the configuration of registrarc rpc and dispatcher +type RegistrarCCfgs struct { + RPC *RegistrarCCfg + Dispatcher *RegistrarCCfg +} + +func (dps *RegistrarCCfgs) loadFromJSONCfg(jsnCfg *RegistrarCJsonCfgs) (err error) { + if jsnCfg == nil { + return nil + } + if err = dps.RPC.loadFromJSONCfg(jsnCfg.RPC); err != nil { + return + } + return dps.Dispatcher.loadFromJSONCfg(jsnCfg.Dispatcher) +} + +// AsMapInterface returns the config as a map[string]interface{} +func (dps *RegistrarCCfgs) AsMapInterface() (initialMP map[string]interface{}) { + return map[string]interface{}{ + utils.RPCCfg: dps.RPC.AsMapInterface(), + utils.DispatcherCfg: dps.Dispatcher.AsMapInterface(), + } +} + +// Clone returns a deep copy of DispatcherHCfg +func (dps RegistrarCCfgs) Clone() (cln *RegistrarCCfgs) { + return &RegistrarCCfgs{ + RPC: dps.RPC.Clone(), + Dispatcher: dps.Dispatcher.Clone(), + } +} + +// RegistrarCCfg is the configuration of registrarc +type RegistrarCCfg struct { + Enabled bool + RegistrarSConns []string + Hosts map[string][]*RemoteHost + RefreshInterval time.Duration +} + +func (dps *RegistrarCCfg) loadFromJSONCfg(jsnCfg *RegistrarCJsonCfg) (err error) { + if jsnCfg == nil { + return nil + } + if jsnCfg.Enabled != nil { + dps.Enabled = *jsnCfg.Enabled + } + if jsnCfg.Registrars_conns != nil { + dps.RegistrarSConns = make([]string, len(*jsnCfg.Registrars_conns)) + copy(dps.RegistrarSConns, *jsnCfg.Registrars_conns) + } + if jsnCfg.Hosts != nil { + for tnt, hosts := range jsnCfg.Hosts { + for _, hostJSON := range hosts { + conn := NewDfltRemoteHost() + conn.loadFromJSONCfg(hostJSON) + dps.Hosts[tnt] = append(dps.Hosts[tnt], conn) + } + } + } + if jsnCfg.Refresh_interval != nil { + if dps.RefreshInterval, err = utils.ParseDurationWithNanosecs(*jsnCfg.Refresh_interval); err != nil { + return + } + } + return +} + +// AsMapInterface returns the config as a map[string]interface{} +func (dps *RegistrarCCfg) AsMapInterface() (initialMP map[string]interface{}) { + initialMP = map[string]interface{}{ + utils.EnabledCfg: dps.Enabled, + utils.RegistrarsConnsCfg: dps.RegistrarSConns, + utils.RefreshIntervalCfg: dps.RefreshInterval.String(), + } + if dps.RefreshInterval == 0 { + initialMP[utils.RefreshIntervalCfg] = "0" + } + if dps.Hosts != nil { + hosts := make(map[string][]map[string]interface{}) + for tnt, hs := range dps.Hosts { + for _, h := range hs { + hosts[tnt] = append(hosts[tnt], h.AsMapInterface()) + } + } + initialMP[utils.HostsCfg] = hosts + } + return +} + +// Clone returns a deep copy of DispatcherHCfg +func (dps RegistrarCCfg) Clone() (cln *RegistrarCCfg) { + cln = &RegistrarCCfg{ + Enabled: dps.Enabled, + RefreshInterval: dps.RefreshInterval, + Hosts: make(map[string][]*RemoteHost), + } + if dps.RegistrarSConns != nil { + cln.RegistrarSConns = make([]string, len(dps.RegistrarSConns)) + for i, k := range dps.RegistrarSConns { + cln.RegistrarSConns[i] = k + } + } + for tnt, hosts := range dps.Hosts { + clnH := make([]*RemoteHost, len(hosts)) + for i, host := range hosts { + clnH[i] = host.Clone() + } + cln.Hosts[tnt] = clnH + } + return +} diff --git a/config/registrarccfg_test.go b/config/registrarccfg_test.go new file mode 100644 index 000000000..85adbc091 --- /dev/null +++ b/config/registrarccfg_test.go @@ -0,0 +1,346 @@ +/* +Real-time Online/Offline Charging System (OCS) for Telecom & ISP environments +Copyright (C) ITsysCOM GmbH + +This program is free software: you can redistribute it and/or modify +it under the terms of the GNU General Public License as published by +the Free Software Foundation, either version 3 of the License, or +(at your option) any later version. + +This program is distributed in the hope that it will be useful, +but WITHOUT ANY WARRANTY; without even the implied warranty of +MERCHANTABILITY or FITNESS FOR A PARTICULAR PURPOSE. See the +GNU General Public License for more details. + +You should have received a copy of the GNU General Public License +along with this program. If not, see +*/ + +package config + +import ( + "reflect" + "testing" + + "github.com/cgrates/cgrates/utils" +) + +func TestDispatcherHCfgloadFromJsonCfg(t *testing.T) { + jsonCfg := &RegistrarCJsonCfgs{ + RPC: &RegistrarCJsonCfg{ + Enabled: utils.BoolPointer(true), + Registrars_conns: &[]string{"*conn1", "*conn2"}, + Hosts: map[string][]*RemoteHostJson{ + utils.MetaDefault: { + { + Id: utils.StringPointer("Host1"), + Transport: utils.StringPointer(utils.MetaJSON), + }, + { + Id: utils.StringPointer("Host2"), + Transport: utils.StringPointer(utils.MetaGOB), + }, + }, + "cgrates.net": { + { + Id: utils.StringPointer("Host1"), + Transport: utils.StringPointer(utils.MetaJSON), + Tls: utils.BoolPointer(true), + }, + { + Id: utils.StringPointer("Host2"), + Transport: utils.StringPointer(utils.MetaGOB), + Tls: utils.BoolPointer(true), + }, + }, + }, + Refresh_interval: utils.StringPointer("5"), + }, + Dispatcher: &RegistrarCJsonCfg{ + Enabled: utils.BoolPointer(true), + Registrars_conns: &[]string{"*conn1", "*conn2"}, + Hosts: map[string][]*RemoteHostJson{ + utils.MetaDefault: { + { + Id: utils.StringPointer("Host1"), + Transport: utils.StringPointer(utils.MetaJSON), + }, + { + Id: utils.StringPointer("Host2"), + Transport: utils.StringPointer(utils.MetaGOB), + }, + }, + "cgrates.net": { + { + Id: utils.StringPointer("Host1"), + Transport: utils.StringPointer(utils.MetaJSON), + Tls: utils.BoolPointer(true), + }, + { + Id: utils.StringPointer("Host2"), + Transport: utils.StringPointer(utils.MetaGOB), + Tls: utils.BoolPointer(true), + }, + }, + }, + Refresh_interval: utils.StringPointer("5"), + }, + } + expected := &RegistrarCCfgs{ + RPC: &RegistrarCCfg{ + Enabled: true, + RegistrarSConns: []string{"*conn1", "*conn2"}, + Hosts: map[string][]*RemoteHost{ + utils.MetaDefault: { + { + ID: "Host1", + Transport: utils.MetaJSON, + }, + { + ID: "Host2", + Transport: utils.MetaGOB, + }, + }, + "cgrates.net": { + { + ID: "Host1", + Transport: utils.MetaJSON, + TLS: true, + }, + { + ID: "Host2", + Transport: utils.MetaGOB, + TLS: true, + }, + }, + }, + RefreshInterval: 5, + }, + Dispatcher: &RegistrarCCfg{ + Enabled: true, + RegistrarSConns: []string{"*conn1", "*conn2"}, + Hosts: map[string][]*RemoteHost{ + utils.MetaDefault: { + { + ID: "Host1", + Transport: utils.MetaJSON, + }, + { + ID: "Host2", + Transport: utils.MetaGOB, + }, + }, + "cgrates.net": { + { + ID: "Host1", + Transport: utils.MetaJSON, + TLS: true, + }, + { + ID: "Host2", + Transport: utils.MetaGOB, + TLS: true, + }, + }, + }, + RefreshInterval: 5, + }, + } + jsnCfg := NewDefaultCGRConfig() + if err = jsnCfg.registrarCCfg.loadFromJSONCfg(jsonCfg); err != nil { + t.Error(err) + } else if !reflect.DeepEqual(expected, jsnCfg.registrarCCfg) { + t.Errorf("Expected %+v \n, received %+v", utils.ToJSON(expected), utils.ToJSON(jsnCfg.registrarCCfg)) + } +} + +func TestDispatcherHCfgAsMapInterface(t *testing.T) { + cfgJSONStr := `{ + "dispatcherh":{ + "enabled": true, + "dispatchers_conns": ["*conn1","*conn2"], + "hosts": { + "*default": [ + { + "ID": "Host1", + "register_transport": "*json", + "register_tls": false + }, + { + "ID": "Host2", + "register_transport": "*gob", + "register_tls": false + } + ] + }, + "register_interval": "0", + }, +}` + eMap := map[string]interface{}{ + utils.EnabledCfg: true, + utils.RegistrarsConnsCfg: []string{"*conn1", "*conn2"}, + utils.HostsCfg: map[string][]map[string]interface{}{ + utils.MetaDefault: { + { + utils.IDCfg: "Host1", + utils.TransportCfg: "*json", + utils.TLS: false, + }, + { + utils.IDCfg: "Host2", + utils.TransportCfg: "*gob", + utils.TLS: false, + }, + }, + }, + utils.RefreshIntervalCfg: "0", + } + if cgrCfg, err := NewCGRConfigFromJSONStringWithDefaults(cfgJSONStr); err != nil { + t.Error(err) + } else if rcv := cgrCfg.registrarCCfg.AsMapInterface(); !reflect.DeepEqual(eMap, rcv) { + t.Errorf("Expected %+v, received %+v", utils.ToJSON(eMap), utils.ToJSON(rcv)) + } +} + +func TestDispatcherCfgParseWithNanoSec(t *testing.T) { + jsonCfg := &RegistrarCJsonCfgs{ + RPC: &RegistrarCJsonCfg{ + Refresh_interval: utils.StringPointer("1ss"), + }, + } + expErrMessage := "time: unknown unit \"ss\" in duration \"1ss\"" + jsnCfg := NewDefaultCGRConfig() + if err = jsnCfg.registrarCCfg.loadFromJSONCfg(jsonCfg); err == nil || err.Error() != expErrMessage { + t.Errorf("Expected %+v \n, recevied %+v", expErrMessage, err) + } +} + +func TestDispatcherCfgParseWithNanoSec2(t *testing.T) { + jsonCfg := &RegistrarCJsonCfgs{ + Dispatcher: &RegistrarCJsonCfg{ + Refresh_interval: utils.StringPointer("1ss"), + }, + } + expErrMessage := "time: unknown unit \"ss\" in duration \"1ss\"" + jsnCfg := NewDefaultCGRConfig() + if err = jsnCfg.registrarCCfg.loadFromJSONCfg(jsonCfg); err == nil || err.Error() != expErrMessage { + t.Errorf("Expected %+v \n, recevied %+v", expErrMessage, err) + } +} + +func TestDispatcherHCfgAsMapInterface1(t *testing.T) { + cfgJSONStr := `{ + "dispatcherh":{ + "enabled": true, + "dispatchers_conns":["conn1"], + "hosts": { + "*default": [ + { + "ID":"", + "register_transport": "*json", + "register_tls":false, + }, + { + "ID":"host2", + "register_transport": "", + "register_tls":true, + }, + ] + }, + "register_interval": "1m", + }, + +}` + eMap := map[string]interface{}{ + utils.EnabledCfg: true, + utils.RegistrarsConnsCfg: []string{"conn1"}, + utils.HostsCfg: map[string][]map[string]interface{}{ + utils.MetaDefault: { + { + utils.IDCfg: utils.EmptyString, + utils.TransportCfg: utils.MetaJSON, + utils.TLS: false, + }, + { + utils.IDCfg: "host2", + utils.TransportCfg: utils.EmptyString, + utils.TLS: true, + }, + }, + }, + utils.RefreshIntervalCfg: "1m0s", + } + if cgrCfg, err := NewCGRConfigFromJSONStringWithDefaults(cfgJSONStr); err != nil { + t.Error(err) + } else { + rcv := cgrCfg.registrarCCfg.AsMapInterface() + if !reflect.DeepEqual(eMap[utils.HostsCfg].(map[string][]map[string]interface{})[utils.IDCfg], + rcv[utils.HostsCfg].(map[string][]map[string]interface{})[utils.IDCfg]) { + t.Errorf("Expected %+v, received %+v", eMap[utils.HostsCfg].(map[string][]map[string]interface{})[utils.IDCfg], + rcv[utils.HostsCfg].(map[string][]map[string]interface{})[utils.IDCfg]) + } else if !reflect.DeepEqual(eMap[utils.HostsCfg], rcv[utils.HostsCfg]) { + t.Errorf("Expected %+v, received %+v", eMap[utils.HostsCfg], rcv[utils.HostsCfg]) + } else if !reflect.DeepEqual(eMap, rcv) { + t.Errorf("Expected %+v, received %+v", eMap, rcv) + } + } +} + +func TestDispatcherHCfgAsMapInterface2(t *testing.T) { + cfgJSONStr := `{ + "dispatcherh": {}, +}` + eMap := map[string]interface{}{ + utils.EnabledCfg: false, + utils.RegistrarsConnsCfg: []string{}, + utils.HostsCfg: map[string][]map[string]interface{}{}, + utils.RefreshIntervalCfg: "5m0s", + } + if cgrCfg, err := NewCGRConfigFromJSONStringWithDefaults(cfgJSONStr); err != nil { + t.Error(err) + } else if rcv := cgrCfg.registrarCCfg.AsMapInterface(); !reflect.DeepEqual(eMap, rcv) { + t.Errorf("Expected %+v, received %+v", eMap, rcv) + } +} + +func TestDispatcherHCfgClone(t *testing.T) { + ban := &RegistrarCCfg{ + Enabled: true, + RegistrarSConns: []string{"*conn1", "*conn2"}, + Hosts: map[string][]*RemoteHost{ + utils.MetaDefault: { + { + ID: "Host1", + Transport: utils.MetaJSON, + }, + { + ID: "Host2", + Transport: utils.MetaGOB, + }, + }, + "cgrates.net": { + { + ID: "Host1", + Transport: utils.MetaJSON, + TLS: true, + }, + { + ID: "Host2", + Transport: utils.MetaGOB, + TLS: true, + }, + }, + }, + RefreshInterval: 5, + } + rcv := ban.Clone() + if !reflect.DeepEqual(ban, rcv) { + t.Errorf("Expected: %+v\nReceived: %+v", utils.ToJSON(ban), utils.ToJSON(rcv)) + } + if rcv.RegistrarSConns[0] = ""; ban.RegistrarSConns[0] != "*conn1" { + t.Errorf("Expected clone to not modify the cloned") + } + if rcv.Hosts[utils.MetaDefault][0].ID = ""; ban.Hosts[utils.MetaDefault][0].ID != "Host1" { + t.Errorf("Expected clone to not modify the cloned") + } +} diff --git a/config/rpcconn.go b/config/rpcconn.go index 9d7a991b6..f37004ea5 100644 --- a/config/rpcconn.go +++ b/config/rpcconn.go @@ -118,6 +118,7 @@ func (rC RPCConn) Clone() (cln *RPCConn) { // RemoteHost connection config type RemoteHost struct { + ID string Address string Transport string Synchronous bool @@ -128,6 +129,9 @@ func (rh *RemoteHost) loadFromJSONCfg(jsnCfg *RemoteHostJson) { if jsnCfg == nil { return } + if jsnCfg.Id != nil { + rh.ID = *jsnCfg.Id + } if jsnCfg.Address != nil { rh.Address = *jsnCfg.Address } @@ -146,6 +150,7 @@ func (rh *RemoteHost) loadFromJSONCfg(jsnCfg *RemoteHostJson) { // AsMapInterface returns the config as a map[string]interface{} func (rh *RemoteHost) AsMapInterface() map[string]interface{} { return map[string]interface{}{ + utils.IDCfg: rh.ID, utils.AddressCfg: rh.Address, utils.TransportCfg: rh.Transport, utils.SynchronousCfg: rh.Synchronous, @@ -162,3 +167,39 @@ func (rh RemoteHost) Clone() (cln *RemoteHost) { TLS: rh.TLS, } } + +// UpdateRPCCons will parse each conn and update only +// the conns that have the same ID +func UpdateRPCCons(rpcConns RPCConns, newHosts map[string]*RemoteHost) (connIDs utils.StringSet) { + connIDs = make(utils.StringSet) + for rpcKey, rpcPool := range rpcConns { + for _, rh := range rpcPool.Conns { + newHost, has := newHosts[rh.ID] + if !has { + continue + } + connIDs.Add(rpcKey) + rh.Address = newHost.Address + rh.Transport = newHost.Transport + rh.Synchronous = newHost.Synchronous + rh.TLS = newHost.TLS + } + } + return +} + +// RemoveRPCCons will parse each conn and reset only +// the conns that have the same ID +func RemoveRPCCons(rpcConns RPCConns, newHosts utils.StringSet) { + for _, rpcPool := range rpcConns { + for _, rh := range rpcPool.Conns { + if !newHosts.Has(rh.ID) { + continue + } + rh.Address = "" + rh.Transport = "" + rh.Synchronous = false + rh.TLS = false + } + } +} diff --git a/data/conf/samples/dispatcherh/all2_mysql/cgrates.json b/data/conf/samples/dispatcherh/all2_mysql/cgrates.json index b9372ecaa..1ce9d6b8d 100644 --- a/data/conf/samples/dispatcherh/all2_mysql/cgrates.json +++ b/data/conf/samples/dispatcherh/all2_mysql/cgrates.json @@ -10,7 +10,7 @@ "listen": { - "rpc_json": ":7012", + "rpc_json": ":7012", "rpc_gob": ":7013", "http": ":7080", }, @@ -102,16 +102,24 @@ }, - - "dispatcherh":{ +"registrarc":{ + "rpc":{ "enabled": true, - "dispatchers_conns": ["dispConn"], + "registrars_conns": ["dispConn"], "hosts": { - "*default":[{"ID":"ALL2", "register_transport": "*json", "register_tls": false}] + "*default":[{"ID":"ALL2", "transport": "*json", "tls": false}] }, - "register_interval": "1s", + "refresh_interval": "1s", }, - + "dispatcher":{ + "enabled": true, + "registrars_conns": ["dispConn"], + "hosts": { + "*default":[{"ID":"ALL2", "transport": "*json", "tls": false}] + }, + "refresh_interval": "1s", + }, +} } \ No newline at end of file diff --git a/engine/dispatcherprfl.go b/engine/dispatcherprfl.go index 95a3e1491..055d2fa02 100644 --- a/engine/dispatcherprfl.go +++ b/engine/dispatcherprfl.go @@ -133,9 +133,8 @@ func (dps DispatcherProfiles) Sort() { // DispatcherHost represents one virtual host used by dispatcher type DispatcherHost struct { - Tenant string - ID string - Conn *config.RemoteHost + Tenant string + *config.RemoteHost rpcConn rpcclient.ClientConnector } @@ -155,7 +154,7 @@ func (dH *DispatcherHost) Call(serviceMethod string, args interface{}, reply int if dH.rpcConn == nil { // connect the rpcConn cfg := config.CgrConfig() - if dH.rpcConn, err = NewRPCConnection(dH.Conn, + if dH.rpcConn, err = NewRPCConnection(dH.RemoteHost, cfg.TLSCfg().ClientKey, cfg.TLSCfg().ClientCerificate, cfg.TLSCfg().CaCertificate, cfg.GeneralCfg().ConnectAttempts, cfg.GeneralCfg().Reconnects, diff --git a/engine/libengine.go b/engine/libengine.go index fc9da5bc8..96300e4d4 100644 --- a/engine/libengine.go +++ b/engine/libengine.go @@ -37,6 +37,9 @@ func NewRPCPool(dispatchStrategy string, keyPath, certPath, caPath string, connA var atLestOneConnected bool // If one connected we don't longer return errors rpcPool = rpcclient.NewRPCPool(dispatchStrategy, replyTimeout) for _, rpcConnCfg := range rpcConnCfgs { + if rpcConnCfg.Address == utils.EmptyString { + continue + } rpcClient, err = NewRPCConnection(rpcConnCfg, keyPath, certPath, caPath, connAttempts, reconnects, connectTimeout, replyTimeout, internalConnChan, lazyConnect, biRPCClient) if err == rpcclient.ErrUnsupportedCodec { diff --git a/engine/model_helpers.go b/engine/model_helpers.go index 0ec9b86ee..ceeb72703 100644 --- a/engine/model_helpers.go +++ b/engine/model_helpers.go @@ -2869,8 +2869,8 @@ func APItoDispatcherHost(tpDPH *utils.TPDispatcherHost) (dpp *DispatcherHost) { } return &DispatcherHost{ Tenant: tpDPH.Tenant, - ID: tpDPH.ID, - Conn: &config.RemoteHost{ + RemoteHost: &config.RemoteHost{ + ID: tpDPH.ID, Address: tpDPH.Conn.Address, Transport: tpDPH.Conn.Transport, TLS: tpDPH.Conn.TLS, @@ -2883,9 +2883,9 @@ func DispatcherHostToAPI(dph *DispatcherHost) (tpDPH *utils.TPDispatcherHost) { Tenant: dph.Tenant, ID: dph.ID, Conn: &utils.TPDispatcherHostConn{ - Address: dph.Conn.Address, - Transport: dph.Conn.Transport, - TLS: dph.Conn.TLS, + Address: dph.Address, + Transport: dph.Transport, + TLS: dph.TLS, }, } } diff --git a/engine/model_helpers_test.go b/engine/model_helpers_test.go index cb0c9db68..7643008ff 100644 --- a/engine/model_helpers_test.go +++ b/engine/model_helpers_test.go @@ -3919,8 +3919,8 @@ func TestAPItoDispatcherHost(t *testing.T) { eOut := &DispatcherHost{ Tenant: "Tenant1", - ID: "ID1", - Conn: &config.RemoteHost{ + RemoteHost: &config.RemoteHost{ + ID: "ID1", Address: "Address1", Transport: "*json", }, @@ -3940,8 +3940,8 @@ func TestAPItoDispatcherHost(t *testing.T) { } eOut = &DispatcherHost{ Tenant: "Tenant2", - ID: "ID2", - Conn: &config.RemoteHost{ + RemoteHost: &config.RemoteHost{ + ID: "ID2", Address: "Address1", Transport: "*json", TLS: true, @@ -3955,8 +3955,8 @@ func TestAPItoDispatcherHost(t *testing.T) { func TestDispatcherHostToAPI(t *testing.T) { dph := &DispatcherHost{ Tenant: "Tenant1", - ID: "ID1", - Conn: &config.RemoteHost{ + RemoteHost: &config.RemoteHost{ + ID: "ID1", Address: "Address1", Transport: "*json", TLS: true, diff --git a/integration_test.sh b/integration_test.sh index 9bf376ffa..d83c6d0f5 100755 --- a/integration_test.sh +++ b/integration_test.sh @@ -35,8 +35,8 @@ results+=($?) echo "go test github.com/cgrates/cgrates/dispatchers -tags=integration $@" go test github.com/cgrates/cgrates/dispatchers -tags=integration $@ results+=($?) -echo "go test github.com/cgrates/cgrates/dispatcherh -tags=integration $@" -go test github.com/cgrates/cgrates/dispatcherh -tags=integration $@ +echo "go test github.com/cgrates/cgrates/registrarc -tags=integration $@" +go test github.com/cgrates/cgrates/registrarc -tags=integration $@ results+=($?) echo "go test github.com/cgrates/cgrates/apier/v1 -tags=offline $@" go test github.com/cgrates/cgrates/apier/v1 -tags=offline $@ @@ -73,8 +73,8 @@ results+=($?) echo 'go test github.com/cgrates/cgrates/dispatchers -tags=integration -dbtype=*internal' go test github.com/cgrates/cgrates/dispatchers -tags=integration -dbtype=*internal results+=($?) -echo "go test github.com/cgrates/cgrates/dispatcherh -tags=integration -dbtype=*internal" -go test github.com/cgrates/cgrates/dispatcherh -tags=integration -dbtype=*internal +echo "go test github.com/cgrates/cgrates/registrarc -tags=integration -dbtype=*internal" +go test github.com/cgrates/cgrates/registrarc -tags=integration -dbtype=*internal results+=($?) echo 'go test github.com/cgrates/cgrates/apier/v1 -tags=offline -dbtype=*internal' go test github.com/cgrates/cgrates/apier/v1 -tags=offline -dbtype=*internal @@ -110,8 +110,8 @@ results+=($?) echo 'go test github.com/cgrates/cgrates/dispatchers -tags=integration -dbtype=*mysql' go test github.com/cgrates/cgrates/dispatchers -tags=integration -dbtype=*mysql results+=($?) -echo "go test github.com/cgrates/cgrates/dispatcherh -tags=integration -dbtype=*mysql" -go test github.com/cgrates/cgrates/dispatcherh -tags=integration -dbtype=*mysql +echo "go test github.com/cgrates/cgrates/registrarc -tags=integration -dbtype=*mysql" +go test github.com/cgrates/cgrates/registrarc -tags=integration -dbtype=*mysql results+=($?) echo 'go test github.com/cgrates/cgrates/apier/v1 -tags=offline -dbtype=*mysql' go test github.com/cgrates/cgrates/apier/v1 -tags=offline -dbtype=*mysql @@ -147,8 +147,8 @@ results+=($?) echo 'go test github.com/cgrates/cgrates/dispatchers -tags=integration -dbtype=*mongo' go test github.com/cgrates/cgrates/dispatchers -tags=integration -dbtype=*mongo results+=($?) -echo "go test github.com/cgrates/cgrates/dispatcherh -tags=integration -dbtype=*mongo" -go test github.com/cgrates/cgrates/dispatcherh -tags=integration -dbtype=*mongo +echo "go test github.com/cgrates/cgrates/registrarc -tags=integration -dbtype=*mongo" +go test github.com/cgrates/cgrates/registrarc -tags=integration -dbtype=*mongo results+=($?) echo 'go test github.com/cgrates/cgrates/apier/v1 -tags=offline -dbtype=*mongo' go test github.com/cgrates/cgrates/apier/v1 -tags=offline -dbtype=*mongo @@ -184,8 +184,8 @@ results+=($?) echo 'go test github.com/cgrates/cgrates/dispatchers -tags=integration -dbtype=*postgres' go test github.com/cgrates/cgrates/dispatchers -tags=integration -dbtype=*postgres results+=($?) -echo "go test github.com/cgrates/cgrates/dispatcherh -tags=integration -dbtype=*postgres" -go test github.com/cgrates/cgrates/dispatcherh -tags=integration -dbtype=*postgres +echo "go test github.com/cgrates/cgrates/registrarc -tags=integration -dbtype=*postgres" +go test github.com/cgrates/cgrates/registrarc -tags=integration -dbtype=*postgres results+=($?) echo 'go test github.com/cgrates/cgrates/apier/v1 -tags=offline -dbtype=*postgres' go test github.com/cgrates/cgrates/apier/v1 -tags=offline -dbtype=*postgres diff --git a/packages/debian/changelog b/packages/debian/changelog index 16e4935a4..6159204e2 100644 --- a/packages/debian/changelog +++ b/packages/debian/changelog @@ -146,6 +146,7 @@ cgrates (0.11.0~dev) UNRELEASED; urgency=medium * [SessionS] Replaced max_call_duration config with default_usage for each ToR * [SessionS] Added JSON and GOB BiRPC support * [ActionS] Added *add_balance, *set_balance and *rem_balance + * [RegistrarC] Renamed DispatcherH to RegistrarC -- DanB Wed, 19 Feb 2020 13:25:52 +0200 diff --git a/dispatcherh/dispatcherh.go b/registrarc/dispatcherh.go similarity index 73% rename from dispatcherh/dispatcherh.go rename to registrarc/dispatcherh.go index c277f2271..26b8fe044 100644 --- a/dispatcherh/dispatcherh.go +++ b/registrarc/dispatcherh.go @@ -16,7 +16,7 @@ You should have received a copy of the GNU General Public License along with this program. If not, see */ -package dispatcherh +package registrarc import ( "fmt" @@ -27,8 +27,8 @@ import ( "github.com/cgrates/cgrates/utils" ) -// NewDispatcherHService constructs a DispatcherHService -func NewDispatcherHService(cfg *config.CGRConfig, +// NewRegistrarCService constructs a DispatcherHService +func NewRegistrarCService(cfg *config.CGRConfig, connMgr *engine.ConnManager) *DispatcherHostsService { return &DispatcherHostsService{ cfg: cfg, @@ -51,22 +51,22 @@ func (dhS *DispatcherHostsService) ListenAndServe(stopChan chan struct{}) { select { case <-stopChan: return - case <-time.After(dhS.cfg.DispatcherHCfg().RegisterInterval): + case <-time.After(dhS.cfg.RegistrarCCfg().Dispatcher.RefreshInterval): } } } // Shutdown is called to shutdown the service func (dhS *DispatcherHostsService) Shutdown() { - utils.Logger.Info(fmt.Sprintf("<%s> service shutdown initialized", utils.DispatcherH)) + utils.Logger.Info(fmt.Sprintf("<%s> service shutdown initialized", utils.RegistrarC)) dhS.unregisterHosts() - utils.Logger.Info(fmt.Sprintf("<%s> service shutdown complete", utils.DispatcherH)) + utils.Logger.Info(fmt.Sprintf("<%s> service shutdown complete", utils.RegistrarC)) return } func (dhS *DispatcherHostsService) registerHosts() { - for _, connID := range dhS.cfg.DispatcherHCfg().DispatchersConns { - for tnt, hostCfgs := range dhS.cfg.DispatcherHCfg().Hosts { + for _, connID := range dhS.cfg.RegistrarCCfg().Dispatcher.RegistrarSConns { + for tnt, hostCfgs := range dhS.cfg.RegistrarCCfg().Dispatcher.Hosts { if tnt == utils.MetaDefault { tnt = dhS.cfg.GeneralCfg().DefaultTenant } @@ -75,9 +75,9 @@ func (dhS *DispatcherHostsService) registerHosts() { continue } var rply string - if err := dhS.connMgr.Call([]string{connID}, nil, utils.DispatcherHv1RegisterHosts, args, &rply); err != nil { + if err := dhS.connMgr.Call([]string{connID}, nil, utils.RegistrarSv1RegisterDispatcherHosts, args, &rply); err != nil { utils.Logger.Warning(fmt.Sprintf("<%s> Unable to set the hosts to the conn with ID <%s> because : %s", - utils.DispatcherH, connID, err)) + utils.RegistrarC, connID, err)) continue } } @@ -87,14 +87,14 @@ func (dhS *DispatcherHostsService) registerHosts() { func (dhS *DispatcherHostsService) unregisterHosts() { var rply string - for _, connID := range dhS.cfg.DispatcherHCfg().DispatchersConns { - for tnt, hostCfgs := range dhS.cfg.DispatcherHCfg().Hosts { + for _, connID := range dhS.cfg.RegistrarCCfg().Dispatcher.RegistrarSConns { + for tnt, hostCfgs := range dhS.cfg.RegistrarCCfg().Dispatcher.Hosts { if tnt == utils.MetaDefault { tnt = dhS.cfg.GeneralCfg().DefaultTenant } - if err := dhS.connMgr.Call([]string{connID}, nil, utils.DispatcherHv1UnregisterHosts, NewUnregisterArgs(tnt, hostCfgs), &rply); err != nil { + if err := dhS.connMgr.Call([]string{connID}, nil, utils.RegistrarSv1UnregisterDispatcherHosts, NewUnregisterArgs(tnt, hostCfgs), &rply); err != nil { utils.Logger.Warning(fmt.Sprintf("<%s> Unable to set the hosts with tenant<%s> to the conn with ID <%s> because : %s", - utils.DispatcherH, tnt, connID, err)) + utils.RegistrarC, tnt, connID, err)) continue } } diff --git a/dispatcherh/dispatcherh_it_test.go b/registrarc/dispatcherh_it_test.go similarity index 99% rename from dispatcherh/dispatcherh_it_test.go rename to registrarc/dispatcherh_it_test.go index 35babf186..9f32d5b28 100644 --- a/dispatcherh/dispatcherh_it_test.go +++ b/registrarc/dispatcherh_it_test.go @@ -18,7 +18,7 @@ You should have received a copy of the GNU General Public License along with this program. If not, see */ -package dispatcherh +package registrarc import ( "bytes" diff --git a/dispatcherh/dispatcherh_test.go b/registrarc/dispatcherh_test.go similarity index 91% rename from dispatcherh/dispatcherh_test.go rename to registrarc/dispatcherh_test.go index dfea9a86d..494bbeeaf 100644 --- a/dispatcherh/dispatcherh_test.go +++ b/registrarc/dispatcherh_test.go @@ -16,7 +16,7 @@ You should have received a copy of the GNU General Public License along with this program. If not, see */ -package dispatcherh +package registrarc import ( "net/http" @@ -54,10 +54,10 @@ func TestDispatcherHostsService(t *testing.T) { }, }, } - cfg.DispatcherHCfg().RegisterInterval = 100 * time.Millisecond - cfg.DispatcherHCfg().DispatchersConns = []string{"conn1"} + cfg.DispatcherHCfg().RefreshInterval = 100 * time.Millisecond + cfg.DispatcherHCfg().RegistrarSConns = []string{"conn1"} - ds := NewDispatcherHService(cfg, engine.NewConnManager(cfg, map[string]chan rpcclient.ClientConnector{})) + ds := NewRegistrarCService(cfg, engine.NewConnManager(cfg, map[string]chan rpcclient.ClientConnector{})) ds.registerHosts() @@ -117,7 +117,7 @@ func TestDispatcherHostsService(t *testing.T) { cfg.ListenCfg().RPCJSONListen = "2012" ds.registerHosts() - ds = NewDispatcherHService(cfg, engine.NewConnManager(cfg, map[string]chan rpcclient.ClientConnector{})) + ds = NewRegistrarCService(cfg, engine.NewConnManager(cfg, map[string]chan rpcclient.ClientConnector{})) ds.Shutdown() stopChan := make(chan struct{}) close(stopChan) diff --git a/dispatcherh/lib_test.go b/registrarc/lib_test.go similarity index 98% rename from dispatcherh/lib_test.go rename to registrarc/lib_test.go index 2a7cf63fe..f60a86188 100644 --- a/dispatcherh/lib_test.go +++ b/registrarc/lib_test.go @@ -15,7 +15,7 @@ GNU General Public License for more details. You should have received a copy of the GNU General Public License along with this program. If not, see */ -package dispatcherh +package registrarc import ( "errors" diff --git a/dispatcherh/libdispatcherh.go b/registrarc/libdispatcherh.go similarity index 67% rename from dispatcherh/libdispatcherh.go rename to registrarc/libdispatcherh.go index 183274bd7..00ce53a91 100644 --- a/dispatcherh/libdispatcherh.go +++ b/registrarc/libdispatcherh.go @@ -16,7 +16,7 @@ You should have received a copy of the GNU General Public License along with this program. If not, see */ -package dispatcherh +package registrarc import ( "encoding/json" @@ -32,7 +32,7 @@ import ( ) // NewRegisterArgs creates the arguments for register hosts API -func NewRegisterArgs(cfg *config.CGRConfig, tnt string, hostCfgs []*config.DispatcherHRegistarCfg) (rargs *RegisterArgs, err error) { +func NewRegisterArgs(cfg *config.CGRConfig, tnt string, hostCfgs []*config.RemoteHost) (rargs *RegisterArgs, err error) { rargs = &RegisterArgs{ Tenant: tnt, Opts: make(map[string]interface{}), @@ -41,17 +41,17 @@ func NewRegisterArgs(cfg *config.CGRConfig, tnt string, hostCfgs []*config.Dispa for i, hostCfg := range hostCfgs { var port string if port, err = getConnPort(cfg, - hostCfg.RegisterTransport, - hostCfg.RegisterTLS); err != nil { + hostCfg.Transport, + hostCfg.TLS); err != nil { utils.Logger.Warning(fmt.Sprintf("<%s> Unable to get the port because : %s", - utils.DispatcherH, err)) + utils.RegistrarC, err)) return } rargs.Hosts[i] = &RegisterHostCfg{ ID: hostCfg.ID, Port: port, - Transport: hostCfg.RegisterTransport, - TLS: hostCfg.RegisterTLS, + Transport: hostCfg.Transport, + TLS: hostCfg.TLS, } } return @@ -66,10 +66,11 @@ type RegisterArgs struct { // RegisterHostCfg the host config used to register type RegisterHostCfg struct { - ID string - Port string - Transport string - TLS bool + ID string + Port string + Transport string + TLS bool + Synchronous bool } // AsDispatcherHosts converts the arguments to DispatcherHosts @@ -85,8 +86,8 @@ func (rargs *RegisterArgs) AsDispatcherHosts(ip string) (dHs []*engine.Dispatche func (rhc *RegisterHostCfg) AsDispatcherHost(tnt, ip string) *engine.DispatcherHost { return &engine.DispatcherHost{ Tenant: tnt, - ID: rhc.ID, - Conn: &config.RemoteHost{ + RemoteHost: &config.RemoteHost{ + ID: rhc.ID, Address: ip + ":" + rhc.Port, Transport: rhc.Transport, TLS: rhc.TLS, @@ -95,7 +96,7 @@ func (rhc *RegisterHostCfg) AsDispatcherHost(tnt, ip string) *engine.DispatcherH } // NewUnregisterArgs creates the arguments for unregister hosts API -func NewUnregisterArgs(tnt string, hostCfgs []*config.DispatcherHRegistarCfg) (uargs *UnregisterArgs) { +func NewUnregisterArgs(tnt string, hostCfgs []*config.RemoteHost) (uargs *UnregisterArgs) { uargs = &UnregisterArgs{ Tenant: tnt, Opts: make(map[string]interface{}), @@ -114,8 +115,8 @@ type UnregisterArgs struct { IDs []string } -// Registar handdle for httpServer to register the dispatcher hosts -func Registar(w http.ResponseWriter, r *http.Request) { +// Registrar handdle for httpServer to register the dispatcher hosts +func Registrar(w http.ResponseWriter, r *http.Request) { defer r.Body.Close() w.Header().Set("Content-Type", "application/json") var result interface{} = utils.OK @@ -127,7 +128,7 @@ func Registar(w http.ResponseWriter, r *http.Request) { } if err := utils.WriteServerResponse(w, id, result, errMessage); err != nil { utils.Logger.Warning(fmt.Sprintf("<%s> Failed to write resonse because: %s", - utils.DispatcherH, err)) + utils.RegistrarC, err)) } } @@ -136,7 +137,7 @@ func register(req *http.Request) (*json.RawMessage, error) { sReq, err := utils.DecodeServerRequest(req.Body) if err != nil { utils.Logger.Warning(fmt.Sprintf("<%s> Failed to decode request because: %s", - utils.DispatcherH, err)) + utils.RegistrarC, err)) return &id, err } var hasErrors bool @@ -144,37 +145,37 @@ func register(req *http.Request) (*json.RawMessage, error) { default: err = errors.New("rpc: can't find service " + sReq.Method) utils.Logger.Warning(fmt.Sprintf("<%s> Failed to register hosts because: %s", - utils.DispatcherH, err)) + utils.RegistrarC, err)) return sReq.Id, err - case utils.DispatcherHv1UnregisterHosts: + case utils.RegistrarSv1UnregisterDispatcherHosts: var args UnregisterArgs params := []interface{}{&args} if err = json.Unmarshal(*sReq.Params, ¶ms); err != nil { utils.Logger.Warning(fmt.Sprintf("<%s> Failed to decode params because: %s", - utils.DispatcherH, err)) + utils.RegistrarC, err)) return sReq.Id, err } for _, id := range args.IDs { if err = engine.Cache.Remove(utils.CacheDispatcherHosts, utils.ConcatenatedKey(args.Tenant, id), true, utils.NonTransactional); err != nil { utils.Logger.Warning(fmt.Sprintf("<%s> Failed to remove DispatcherHost <%s> from cache because: %s", - utils.DispatcherH, id, err)) + utils.RegistrarC, id, err)) hasErrors = true continue } } - case utils.DispatcherHv1RegisterHosts: + case utils.RegistrarSv1RegisterDispatcherHosts: var dHs RegisterArgs params := []interface{}{&dHs} if err = json.Unmarshal(*sReq.Params, ¶ms); err != nil { utils.Logger.Warning(fmt.Sprintf("<%s> Failed to decode params because: %s", - utils.DispatcherH, err)) + utils.RegistrarC, err)) return sReq.Id, err } var addr string if addr, err = utils.GetRemoteIP(req); err != nil { utils.Logger.Warning(fmt.Sprintf("<%s> Failed to obtain the remote IP because: %s", - utils.DispatcherH, err)) + utils.RegistrarC, err)) return sReq.Id, err } @@ -182,11 +183,59 @@ func register(req *http.Request) (*json.RawMessage, error) { if err = engine.Cache.Set(utils.CacheDispatcherHosts, dH.TenantID(), dH, nil, true, utils.NonTransactional); err != nil { utils.Logger.Warning(fmt.Sprintf("<%s> Failed to set DispatcherHost <%s> in cache because: %s", - utils.DispatcherH, dH.TenantID(), err)) + utils.RegistrarC, dH.TenantID(), err)) hasErrors = true continue } } + + case utils.RegistrarSv1UnregisterRPCHosts: + var args UnregisterArgs + params := []interface{}{&args} + if err = json.Unmarshal(*sReq.Params, ¶ms); err != nil { + utils.Logger.Warning(fmt.Sprintf("<%s> Failed to decode params because: %s", + utils.RegistrarC, err)) + return sReq.Id, err + } + config.CgrConfig().LockSections(config.RPCConnsJsonName) + for _, connID := range args.IDs { + if err = engine.Cache.Remove(utils.CacheRPCConnections, connID, + true, utils.NonTransactional); err != nil { + utils.Logger.Warning(fmt.Sprintf("<%s> Failed to remove connection <%s> in cache because: %s", + utils.RegistrarC, connID, err)) + hasErrors = true + } + } + config.CgrConfig().UnlockSections(config.RPCConnsJsonName) + case utils.RegistrarSv1RegisterRPCHosts: + var dHs RegisterArgs + params := []interface{}{&dHs} + if err = json.Unmarshal(*sReq.Params, ¶ms); err != nil { + utils.Logger.Warning(fmt.Sprintf("<%s> Failed to decode params because: %s", + utils.RegistrarC, err)) + return sReq.Id, err + } + var addr string + if addr, err = utils.GetRemoteIP(req); err != nil { + utils.Logger.Warning(fmt.Sprintf("<%s> Failed to obtain the remote IP because: %s", + utils.RegistrarC, err)) + return sReq.Id, err + } + + cfgHosts := make(map[string]*config.RemoteHost) + for _, dH := range dHs.AsDispatcherHosts(addr) { + cfgHosts[dH.ID] = dH.RemoteHost + } + config.CgrConfig().LockSections(config.RPCConnsJsonName) + for connID := range config.UpdateRPCCons(config.CgrConfig().RPCConns(), cfgHosts) { + if err = engine.Cache.Remove(utils.CacheRPCConnections, connID, + true, utils.NonTransactional); err != nil { + utils.Logger.Warning(fmt.Sprintf("<%s> Failed to remove connection <%s> in cache because: %s", + utils.RegistrarC, connID, err)) + hasErrors = true + } + } + config.CgrConfig().UnlockSections(config.RPCConnsJsonName) } if hasErrors { return sReq.Id, utils.ErrPartiallyExecuted diff --git a/dispatcherh/libdispatcherh_test.go b/registrarc/libdispatcherh_test.go similarity index 92% rename from dispatcherh/libdispatcherh_test.go rename to registrarc/libdispatcherh_test.go index c92943b57..4a0d0aaf8 100644 --- a/dispatcherh/libdispatcherh_test.go +++ b/registrarc/libdispatcherh_test.go @@ -16,7 +16,7 @@ You should have received a copy of the GNU General Public License along with this program. If not, see */ -package dispatcherh +package registrarc import ( "bytes" @@ -56,8 +56,8 @@ func TestRegisterArgsAsDispatcherHosts(t *testing.T) { exp := []*engine.DispatcherHost{ { Tenant: "cgrates.org", - ID: "Host1", - Conn: &config.RemoteHost{ + RemoteHost: &config.RemoteHost{ + ID: "Host1", Address: "127.0.0.1:2012", TLS: true, Transport: utils.MetaJSON, @@ -65,8 +65,8 @@ func TestRegisterArgsAsDispatcherHosts(t *testing.T) { }, { Tenant: "cgrates.org", - ID: "Host2", - Conn: &config.RemoteHost{ + RemoteHost: &config.RemoteHost{ + ID: "Host2", Address: "127.0.0.1:2013", TLS: false, Transport: utils.MetaGOB, @@ -149,7 +149,7 @@ func TestRegister(t *testing.T) { if err != nil { t.Fatal(err) } - args := utils.NewServerRequest(utils.DispatcherHv1RegisterHosts, raJSON, id) + args := utils.NewServerRequest(utils.RegistrarSv1RegisterDispatcherHosts, raJSON, id) argsJSON, err := json.Marshal(args) if err != nil { t.Fatal(err) @@ -168,8 +168,8 @@ func TestRegister(t *testing.T) { host1 := &engine.DispatcherHost{ Tenant: "cgrates.org", - ID: "Host1", - Conn: &config.RemoteHost{ + RemoteHost: &config.RemoteHost{ + ID: "Host1", Address: "127.0.0.1:2012", TLS: true, Transport: utils.MetaJSON, @@ -177,8 +177,8 @@ func TestRegister(t *testing.T) { } host2 := &engine.DispatcherHost{ Tenant: "cgrates.org", - ID: "Host2", - Conn: &config.RemoteHost{ + RemoteHost: &config.RemoteHost{ + ID: "Host2", Address: "127.0.0.1:2013", TLS: false, Transport: utils.MetaGOB, @@ -210,7 +210,7 @@ func TestRegister(t *testing.T) { if err != nil { t.Fatal(err) } - uargs := utils.NewServerRequest(utils.DispatcherHv1UnregisterHosts, uaJSON, id) + uargs := utils.NewServerRequest(utils.RegistrarSv1UnregisterDispatcherHosts, uaJSON, id) uargsJSON, err := json.Marshal(uargs) if err != nil { t.Fatal(err) @@ -264,7 +264,7 @@ func TestRegister(t *testing.T) { if _, err := register(req); err == nil { t.Errorf("Expected error,received: nil") } - args2 := utils.NewServerRequest(utils.DispatcherHv1RegisterHosts, id, id) + args2 := utils.NewServerRequest(utils.RegistrarSv1RegisterDispatcherHosts, id, id) args2JSON, err := json.Marshal(args2) if err != nil { t.Fatal(err) @@ -273,7 +273,7 @@ func TestRegister(t *testing.T) { if _, err := register(req); err == nil { t.Errorf("Expected error,received: nil") } - args2 = utils.NewServerRequest(utils.DispatcherHv1UnregisterHosts, id, id) + args2 = utils.NewServerRequest(utils.RegistrarSv1UnregisterDispatcherHosts, id, id) args2JSON, err = json.Marshal(args2) if err != nil { t.Fatal(err) @@ -301,7 +301,7 @@ func (*errRecorder) Header() http.Header { return make(http.Header) } func (*errRecorder) Write([]byte) (int, error) { return 0, io.EOF } func (*errRecorder) WriteHeader(statusCode int) {} -func TestRegistar(t *testing.T) { +func TestRegistrar(t *testing.T) { w := httptest.NewRecorder() ra := &RegisterArgs{ Tenant: "cgrates.org", @@ -326,7 +326,7 @@ func TestRegistar(t *testing.T) { if err != nil { t.Fatal(err) } - args := utils.NewServerRequest(utils.DispatcherHv1RegisterHosts, raJSON, id) + args := utils.NewServerRequest(utils.RegistrarSv1RegisterDispatcherHosts, raJSON, id) argsJSON, err := json.Marshal(args) if err != nil { t.Fatal(err) @@ -337,18 +337,18 @@ func TestRegistar(t *testing.T) { } req.RemoteAddr = "127.0.0.1:2356" - Registar(w, req) + Registrar(w, req) exp := "{\"id\":1,\"result\":\"OK\",\"error\":null}\n" if w.Body.String() != exp { t.Errorf("Expected: %q ,received: %q", exp, w.Body.String()) } w = httptest.NewRecorder() - Registar(w, req) + Registrar(w, req) exp = "{\"id\":0,\"result\":null,\"error\":\"EOF\"}\n" if w.Body.String() != exp { t.Errorf("Expected: %q ,received: %q", exp, w.Body.String()) } - Registar(new(errRecorder), req) + Registrar(new(errRecorder), req) } diff --git a/services/dispatcherh.go b/services/registrarc.go similarity index 78% rename from services/dispatcherh.go rename to services/registrarc.go index d55fbd5a6..d32a27f0b 100644 --- a/services/dispatcherh.go +++ b/services/registrarc.go @@ -29,11 +29,11 @@ import ( "github.com/cgrates/cgrates/utils" ) -// NewDispatcherHostsService returns the Dispatcher Service -func NewDispatcherHostsService(cfg *config.CGRConfig, server *cores.Server, +// NewRegistrarCService returns the Dispatcher Service +func NewRegistrarCService(cfg *config.CGRConfig, server *cores.Server, connMgr *engine.ConnManager, anz *AnalyzerService, srvDep map[string]*sync.WaitGroup) servmanager.Service { - return &DispatcherHostsService{ + return &RegistrarCService{ cfg: cfg, server: server, connMgr: connMgr, @@ -42,8 +42,8 @@ func NewDispatcherHostsService(cfg *config.CGRConfig, server *cores.Server, } } -// DispatcherHostsService implements Service interface -type DispatcherHostsService struct { +// RegistrarCService implements Service interface +type RegistrarCService struct { sync.RWMutex cfg *config.CGRConfig server *cores.Server @@ -56,7 +56,7 @@ type DispatcherHostsService struct { } // Start should handle the sercive start -func (dspS *DispatcherHostsService) Start() (err error) { +func (dspS *RegistrarCService) Start() (err error) { if dspS.IsRunning() { return utils.ErrServiceAlreadyRunning } @@ -72,12 +72,12 @@ func (dspS *DispatcherHostsService) Start() (err error) { } // Reload handles the change of config -func (dspS *DispatcherHostsService) Reload() (err error) { +func (dspS *RegistrarCService) Reload() (err error) { return // for the momment nothing to reload } // Shutdown stops the service -func (dspS *DispatcherHostsService) Shutdown() (err error) { +func (dspS *RegistrarCService) Shutdown() (err error) { dspS.Lock() close(dspS.stopChan) dspS.dspS.Shutdown() @@ -87,18 +87,18 @@ func (dspS *DispatcherHostsService) Shutdown() (err error) { } // IsRunning returns if the service is running -func (dspS *DispatcherHostsService) IsRunning() bool { +func (dspS *RegistrarCService) IsRunning() bool { dspS.RLock() defer dspS.RUnlock() return dspS != nil && dspS.dspS != nil } // ServiceName returns the service name -func (dspS *DispatcherHostsService) ServiceName() string { - return utils.DispatcherH +func (dspS *RegistrarCService) ServiceName() string { + return utils.RegistrarC } // ShouldRun returns if the service should be running -func (dspS *DispatcherHostsService) ShouldRun() bool { +func (dspS *RegistrarCService) ShouldRun() bool { return dspS.cfg.DispatcherHCfg().Enabled } diff --git a/services/dispatcherh_it_test.go b/services/registrarc_it_test.go similarity index 95% rename from services/dispatcherh_it_test.go rename to services/registrarc_it_test.go index 2f84f56de..85f08fbd8 100644 --- a/services/dispatcherh_it_test.go +++ b/services/registrarc_it_test.go @@ -55,7 +55,7 @@ func TestDispatcherHReload(t *testing.T) { db := NewDataDBService(cfg, nil, srvDep) anz := NewAnalyzerService(cfg, server, filterSChan, shdChan, make(chan rpcclient.ClientConnector, 1), srvDep) connMngr := engine.NewConnManager(cfg, nil) - srv := NewDispatcherHostsService(cfg, server, connMngr, anz, srvDep) + srv := NewRegistrarCService(cfg, server, connMngr, anz, srvDep) srvMngr.AddServices(srv, NewLoaderService(cfg, db, filterSChan, server, make(chan rpcclient.ClientConnector, 1), nil, anz, srvDep), db) @@ -69,7 +69,7 @@ func TestDispatcherHReload(t *testing.T) { var reply string if err := cfg.V1ReloadConfig(&config.ReloadArgs{ Path: path.Join("/usr", "share", "cgrates", "conf", "samples", "dispatcherh", "all_mongo"), - Section: config.DispatcherHJson, + Section: config.RegistrarCJson, }, &reply); err != nil { t.Fatal(err) } else if reply != utils.OK { @@ -90,7 +90,7 @@ func TestDispatcherHReload(t *testing.T) { t.Errorf("\nExpecting ,\n Received <%+v>", err) } cfg.DispatcherHCfg().Enabled = false - cfg.GetReloadChan(config.DispatcherHJson) <- struct{}{} + cfg.GetReloadChan(config.RegistrarCJson) <- struct{}{} time.Sleep(10 * time.Millisecond) if srv.IsRunning() { t.Errorf("Expected service to be down") diff --git a/services/dispatcherh_test.go b/services/registrarc_test.go similarity index 91% rename from services/dispatcherh_test.go rename to services/registrarc_test.go index 499737703..c3846ea2d 100644 --- a/services/dispatcherh_test.go +++ b/services/registrarc_test.go @@ -38,11 +38,11 @@ func TestDispatcherHCoverage(t *testing.T) { anz := NewAnalyzerService(cfg, server, filterSChan, shdChan, make(chan rpcclient.ClientConnector, 1), srvDep) rpcInternal := map[string]chan rpcclient.ClientConnector{} cM := engine.NewConnManager(cfg, rpcInternal) - srv := NewDispatcherHostsService(cfg, server, cM, anz, srvDep) + srv := NewRegistrarCService(cfg, server, cM, anz, srvDep) if srv == nil { t.Errorf("\nExpecting ,\n Received <%+v>", utils.ToJSON(srv)) } - srv2 := &DispatcherHostsService{ + srv2 := &RegistrarCService{ cfg: cfg, server: server, connMgr: cM, @@ -58,8 +58,8 @@ func TestDispatcherHCoverage(t *testing.T) { } serviceName := srv2.ServiceName() - if !reflect.DeepEqual(serviceName, utils.DispatcherH) { - t.Errorf("\nExpecting <%+v>,\n Received <%+v>", utils.DispatcherH, serviceName) + if !reflect.DeepEqual(serviceName, utils.RegistrarC) { + t.Errorf("\nExpecting <%+v>,\n Received <%+v>", utils.RegistrarC, serviceName) } shouldRun := srv2.ShouldRun() if !reflect.DeepEqual(shouldRun, false) { diff --git a/servmanager/servmanager.go b/servmanager/servmanager.go index fec8020ee..a14effa7d 100644 --- a/servmanager/servmanager.go +++ b/servmanager/servmanager.go @@ -243,8 +243,8 @@ func (srvMngr *ServiceManager) handleReload() { engine.Cache.Clear([]string{utils.CacheRPCConnections}) case <-srvMngr.GetConfig().GetReloadChan(config.SIPAgentJson): go srvMngr.reloadService(utils.SIPAgent) - case <-srvMngr.GetConfig().GetReloadChan(config.DispatcherHJson): - go srvMngr.reloadService(utils.DispatcherH) + case <-srvMngr.GetConfig().GetReloadChan(config.RegistrarCJson): + go srvMngr.reloadService(utils.RegistrarC) case <-srvMngr.GetConfig().GetReloadChan(config.HTTP_JSN): go srvMngr.reloadService(utils.GlobalVarS) case <-srvMngr.GetConfig().GetReloadChan(config.AccountSCfgJson): diff --git a/utils/consts.go b/utils/consts.go index df4473736..c44eea8fa 100755 --- a/utils/consts.go +++ b/utils/consts.go @@ -506,7 +506,7 @@ const ( MetaBlockerError = "*blocker_error" MetaConfig = "*config" MetaDispatchers = "*dispatchers" - MetaDispatcherh = "*dispatcherh" + MetaRegistrarC = "*registrarc" MetaDispatcherHosts = "*dispatcher_hosts" MetaFilters = "*filters" MetaCDRs = "*cdrs" @@ -1027,7 +1027,7 @@ const ( FilterS = "FilterS" ThresholdS = "ThresholdS" DispatcherS = "DispatcherS" - DispatcherH = "DispatcherH" + RegistrarC = "RegistrarC" LoaderS = "LoaderS" ChargerS = "ChargerS" CacheS = "CacheS" @@ -1736,10 +1736,13 @@ const ( DispatcherServicePing = "DispatcherService.Ping" ) -// DispatcherH APIs +// RegistrarS APIs const ( - DispatcherHv1RegisterHosts = "DispatcherHv1.RegisterHosts" - DispatcherHv1UnregisterHosts = "DispatcherHv1.UnregisterHosts" + RegistrarSv1RegisterDispatcherHosts = "RegistrarSv1.RegisterDispatcherHosts" + RegistrarSv1UnregisterDispatcherHosts = "RegistrarSv1.UnregisterDispatcherHosts" + + RegistrarSv1RegisterRPCHosts = "RegistrarSv1.RegisterRPCHosts" + RegistrarSv1UnregisterRPCHosts = "RegistrarSv1.UnregisterRPCHosts" ) // RateProfile APIs @@ -2145,15 +2148,15 @@ const ( // HTTPCfg const ( - HTTPJsonRPCURLCfg = "json_rpc_url" - DispatchersRegistrarURLCfg = "dispatchers_registrar_url" - HTTPWSURLCfg = "ws_url" - HTTPFreeswitchCDRsURLCfg = "freeswitch_cdrs_url" - HTTPCDRsURLCfg = "http_cdrs" - HTTPUseBasicAuthCfg = "use_basic_auth" - HTTPAuthUsersCfg = "auth_users" - HTTPClientOptsCfg = "client_opts" - ConfigsURL = "configs_url" + HTTPJsonRPCURLCfg = "json_rpc_url" + RegistrarSURLCfg = "registrars_url" + HTTPWSURLCfg = "ws_url" + HTTPFreeswitchCDRsURLCfg = "freeswitch_cdrs_url" + HTTPCDRsURLCfg = "http_cdrs" + HTTPUseBasicAuthCfg = "use_basic_auth" + HTTPAuthUsersCfg = "auth_users" + HTTPClientOptsCfg = "client_opts" + ConfigsURL = "configs_url" HTTPClientTLSClientConfigCfg = "skipTlsVerify" HTTPClientTLSHandshakeTimeoutCfg = "tlsHandshakeTimeout" @@ -2478,13 +2481,13 @@ const ( CacheDumpFieldsCfg = "cache_dump_fields" ) -// DispatcherHCfg +// RegistrarCCfg const ( - DispatchersConnsCfg = "dispatchers_conns" - HostsCfg = "hosts" - RegisterIntervalCfg = "register_interval" - RegisterTransportCfg = "register_transport" - RegisterTLSCfg = "register_tls" + RPCCfg = "rpc" + DispatcherCfg = "dispatcher" + RegistrarsConnsCfg = "registrars_conns" + HostsCfg = "hosts" + RefreshIntervalCfg = "refresh_interval" ) // APIBanCfg diff --git a/utils/net.go b/utils/net.go index 982ecc961..00ed14e82 100644 --- a/utils/net.go +++ b/utils/net.go @@ -111,7 +111,7 @@ func DecodeServerRequest(r io.Reader) (req *serverRequest, err error) { return } -// NewServerRequest used in dispatcherh tests +// NewServerRequest used in registrarc tests func NewServerRequest(method string, params, id json.RawMessage) *serverRequest { return &serverRequest{ Method: method,