diff --git a/cmd/cgr-engine/cgr-engine.go b/cmd/cgr-engine/cgr-engine.go index ccfb5927f..08bc376d0 100644 --- a/cmd/cgr-engine/cgr-engine.go +++ b/cmd/cgr-engine/cgr-engine.go @@ -114,7 +114,7 @@ func initServiceManagerV1(internalServiceManagerChan chan rpcclient.ClientConnec internalServiceManagerChan <- srvMngr } -func startRpc(server *utils.Server, internalRaterChan, +func startRPC(server *utils.Server, internalRaterChan, internalCdrSChan, internalRsChan, internalStatSChan, internalAttrSChan, internalChargerSChan, internalThdSChan, internalSuplSChan, internalSMGChan, internalAnalyzerSChan, internalDispatcherSChan, @@ -439,6 +439,7 @@ func main() { internalCDRServerChan := make(chan rpcclient.ClientConnector, 1) internalAttributeSChan := make(chan rpcclient.ClientConnector, 1) internalDispatcherSChan := make(chan rpcclient.ClientConnector, 1) + internalDispatcherHChan := make(chan rpcclient.ClientConnector, 1) internalSessionSChan := make(chan rpcclient.ClientConnector, 1) internalChargerSChan := make(chan rpcclient.ClientConnector, 1) internalThresholdSChan := make(chan rpcclient.ClientConnector, 1) @@ -480,6 +481,7 @@ func main() { utils.ConcatenatedKey(utils.MetaInternal, utils.MetaEEs): internalEEsChan, utils.ConcatenatedKey(utils.MetaInternal, utils.MetaRateS): internalRateSChan, utils.ConcatenatedKey(utils.MetaInternal, utils.MetaDispatchers): internalDispatcherSChan, + utils.ConcatenatedKey(utils.MetaInternal, utils.MetaDispatcherh): internalDispatcherHChan, }) dmService := services.NewDataDBService(cfg, connManager) @@ -525,6 +527,7 @@ func main() { srvManager := servmanager.NewServiceManager(cfg, exitChan) attrS := services.NewAttributeService(cfg, dmService, cacheS, filterSChan, server, internalAttributeSChan) dspS := services.NewDispatcherService(cfg, dmService, cacheS, filterSChan, server, internalDispatcherSChan, connManager) + dspH := services.NewDispatcherHostsService(cfg, server, internalDispatcherSChan, connManager) chrS := services.NewChargerService(cfg, dmService, cacheS, filterSChan, server, internalChargerSChan, connManager) tS := services.NewThresholdService(cfg, dmService, cacheS, filterSChan, server, internalThresholdSChan) @@ -567,7 +570,7 @@ func main() { services.NewRadiusAgent(cfg, filterSChan, exitChan, connManager), // partial reload services.NewDiameterAgent(cfg, filterSChan, exitChan, connManager), // partial reload services.NewHTTPAgent(cfg, filterSChan, server, connManager), // no reload - ldrs, anz, dspS, dmService, storDBService, + ldrs, anz, dspS, dspH, dmService, storDBService, services.NewEventExporterService(cfg, filterSChan, connManager, server, exitChan, internalEEsChan), services.NewRateService(cfg, cacheS, filterSChan, dmService, @@ -605,6 +608,9 @@ func main() { engine.IntRPC.AddInternalRPCClient(utils.CoreSv1, internalCoreSv1Chan) engine.IntRPC.AddInternalRPCClient(utils.RALsV1, internalRALsChan) engine.IntRPC.AddInternalRPCClient(utils.RateSv1, internalRateSChan) + engine.IntRPC.AddInternalRPCClient(utils.EventExporterSv1, internalEEsChan) + engine.IntRPC.AddInternalRPCClient(utils.DispatcherSv1, internalDispatcherSChan) + // engine.IntRPC.AddInternalRPCClient(utils.DispatcherHv1, internalDispatcherHChan) initConfigSv1(internalConfigChan, server) @@ -613,7 +619,7 @@ func main() { } // Serve rpc connections - go startRpc(server, internalResponderChan, internalCDRServerChan, + go startRPC(server, internalResponderChan, internalCDRServerChan, internalResourceSChan, internalStatSChan, internalAttributeSChan, internalChargerSChan, internalThresholdSChan, internalRouteSChan, internalSessionSChan, internalAnalyzerSChan, diff --git a/config/config.go b/config/config.go index e975f0cc4..9cc3c6283 100755 --- a/config/config.go +++ b/config/config.go @@ -1521,6 +1521,8 @@ func (cfg *CGRConfig) reloadSections(sections ...string) (err error) { cfg.rldChans[SIPAgentJson] <- struct{}{} case RateSJson: cfg.rldChans[RateSJson] <- struct{}{} + case DispatcherHJson: + cfg.rldChans[DispatcherHJson] <- struct{}{} } return } diff --git a/config/config_json.go b/config/config_json.go index d42e93452..6fbc06b9e 100644 --- a/config/config_json.go +++ b/config/config_json.go @@ -69,7 +69,7 @@ var ( CACHE_JSN, FilterSjsn, RALS_JSN, CDRS_JSN, ERsJson, SessionSJson, AsteriskAgentJSN, FreeSWITCHAgentJSN, KamailioAgentJSN, DA_JSN, RA_JSN, HttpAgentJson, DNSAgentJson, ATTRIBUTE_JSN, ChargerSCfgJson, RESOURCES_JSON, STATS_JSON, THRESHOLDS_JSON, RouteSJson, LoaderJson, MAILER_JSN, SURETAX_JSON, CgrLoaderCfgJson, CgrMigratorCfgJson, DispatcherSJson, - AnalyzerCfgJson, ApierS, EEsJson, RateSJson, SIPAgentJson} + AnalyzerCfgJson, ApierS, EEsJson, RateSJson, SIPAgentJson, DispatcherHJson} ) // Loads the json config out of io.Reader, eg other sources than file, maybe over http diff --git a/config/configsanity.go b/config/configsanity.go index 04b09becb..9ab4b3c4c 100644 --- a/config/configsanity.go +++ b/config/configsanity.go @@ -24,6 +24,7 @@ import ( "strings" "github.com/cgrates/cgrates/utils" + "github.com/cgrates/rpcclient" ) // Exported in cgr-engine @@ -669,7 +670,7 @@ func (cfg *CGRConfig) checkConfigSanity() error { // FilterS sanity check for _, connID := range cfg.filterSCfg.StatSConns { if strings.HasPrefix(connID, utils.MetaInternal) && !cfg.statsCfg.Enabled { - return fmt.Errorf("<%s> not enabled but requested by <%s> component.", utils.StatS, utils.FilterS) + return fmt.Errorf("<%s> not enabled but requested by <%s> component", utils.StatS, utils.FilterS) } if _, has := cfg.rpcConns[connID]; !has && !strings.HasPrefix(connID, utils.MetaInternal) { return fmt.Errorf("<%s> connection with id: <%s> not defined", utils.FilterS, connID) @@ -677,7 +678,7 @@ func (cfg *CGRConfig) checkConfigSanity() error { } for _, connID := range cfg.filterSCfg.ResourceSConns { if strings.HasPrefix(connID, utils.MetaInternal) && !cfg.resourceSCfg.Enabled { - return fmt.Errorf("<%s> not enabled but requested by <%s> component.", utils.ResourceS, utils.FilterS) + return fmt.Errorf("<%s> not enabled but requested by <%s> component", utils.ResourceS, utils.FilterS) } if _, has := cfg.rpcConns[connID]; !has && !strings.HasPrefix(connID, utils.MetaInternal) { return fmt.Errorf("<%s> connection with id: <%s> not defined", utils.FilterS, connID) @@ -685,12 +686,47 @@ func (cfg *CGRConfig) checkConfigSanity() error { } for _, connID := range cfg.filterSCfg.ApierSConns { if strings.HasPrefix(connID, utils.MetaInternal) && !cfg.apier.Enabled { - return fmt.Errorf("<%s> not enabled but requested by <%s> component.", utils.ApierS, utils.FilterS) + return fmt.Errorf("<%s> not enabled but requested by <%s> component", utils.ApierS, utils.FilterS) } if _, has := cfg.rpcConns[connID]; !has && !strings.HasPrefix(connID, utils.MetaInternal) { return fmt.Errorf("<%s> connection with id: <%s> not defined", utils.FilterS, connID) } } + if cfg.dispatcherHCfg.Enabled { + if len(cfg.dispatcherHCfg.HostIDs) == 0 { + return fmt.Errorf("<%s> missing dispatcher host IDs", utils.DispatcherH) + } + if cfg.dispatcherHCfg.RegisterInterval <= 0 { + return fmt.Errorf("<%s> the register imterval needs to be bigger than 0", utils.DispatcherH) + } + if !utils.SliceHasMember([]string{utils.MetaGOB, rpcclient.HTTPjson, utils.MetaJSON}, cfg.dispatcherHCfg.RegisterTransport) { + return fmt.Errorf("<%s> unsupported transport: <%s>", utils.DispatcherH, cfg.dispatcherHCfg.RegisterTransport) + } + if len(cfg.dispatcherHCfg.DispatchersConns) == 0 { + return fmt.Errorf("<%s> missing dispatcher connection IDs", utils.DispatcherH) + } + for _, connID := range cfg.dispatcherHCfg.DispatchersConns { + if connID == utils.MetaInternal { + return fmt.Errorf("<%s> internal connection IDs are not supported", utils.DispatcherH) + } + connCfg, has := cfg.rpcConns[connID] + if !has { + return fmt.Errorf("<%s> connection with id: <%s> not defined", utils.DispatcherH, connID) + } + if len(connCfg.Conns) != 0 { + return fmt.Errorf("<%s> connection with id: <%s> needs to have only one host", utils.DispatcherH, connID) + } + if connCfg.Conns[0].Transport != rpcclient.HTTPjson { + return fmt.Errorf("<%s> connection with id: <%s> unsupported transport <%s>", utils.DispatcherH, connID, connCfg.Conns[0].Transport) + } + } + + /* + DispatchersConns []string + RegisterTransport string + */ + } + return nil } diff --git a/config/configsanity_test.go b/config/configsanity_test.go index 0391c8d3d..2bf0e2ca2 100644 --- a/config/configsanity_test.go +++ b/config/configsanity_test.go @@ -132,11 +132,9 @@ func TestConfigSanityLoaders(t *testing.T) { &LoaderSCfg{ Enabled: true, TpInDir: "/not/exist", - Data: []*LoaderDataType{ - &LoaderDataType{ - Type: "strsdfing", - }, - }, + Data: []*LoaderDataType{{ + Type: "strsdfing", + }}, }, } expected := " nonexistent folder: /not/exist" @@ -149,11 +147,9 @@ func TestConfigSanityLoaders(t *testing.T) { Enabled: true, TpInDir: "/", TpOutDir: "/", - Data: []*LoaderDataType{ - &LoaderDataType{ - Type: "wrongtype", - }, - }, + Data: []*LoaderDataType{{ + Type: "wrongtype", + }}, }, } expected = " unsupported data type wrongtype" @@ -166,17 +162,13 @@ func TestConfigSanityLoaders(t *testing.T) { Enabled: true, TpInDir: "/", TpOutDir: "/", - Data: []*LoaderDataType{ - &LoaderDataType{ + Data: []*LoaderDataType{{ + Type: utils.MetaStats, + Fields: []*FCTemplate{{ Type: utils.MetaStats, - Fields: []*FCTemplate{ - &FCTemplate{ - Type: utils.MetaStats, - Tag: "test1", - }, - }, - }, - }, + Tag: "test1", + }}, + }}, }, } expected = " invalid field type *stats for *stats at test1" @@ -637,38 +629,32 @@ func TestConfigSanityEventReader(t *testing.T) { t.Errorf("Expecting: %+q received: %+q", expected, err) } cfg.sessionSCfg.Enabled = true - cfg.ersCfg.Readers = []*EventReaderCfg{ - &EventReaderCfg{ - ID: "test", - Type: "wrongtype", - }, - } + cfg.ersCfg.Readers = []*EventReaderCfg{{ + ID: "test", + Type: "wrongtype", + }} expected = " unsupported data type: wrongtype for reader with ID: test" if err := cfg.checkConfigSanity(); err == nil || err.Error() != expected { t.Errorf("Expecting: %+q received: %+q", expected, err) } - cfg.ersCfg.Readers = []*EventReaderCfg{ - &EventReaderCfg{ - ID: "test2", - Type: utils.MetaFileCSV, - ProcessedPath: "not/a/path", - }, - } + cfg.ersCfg.Readers = []*EventReaderCfg{{ + ID: "test2", + Type: utils.MetaFileCSV, + ProcessedPath: "not/a/path", + }} expected = " nonexistent folder: not/a/path for reader with ID: test2" if err := cfg.checkConfigSanity(); err == nil || err.Error() != expected { t.Errorf("Expecting: %+q received: %+q", expected, err) } - cfg.ersCfg.Readers = []*EventReaderCfg{ - &EventReaderCfg{ - ID: "test3", - Type: utils.MetaFileCSV, - ProcessedPath: "/", - SourcePath: "/", - FieldSep: "", - }, - } + cfg.ersCfg.Readers = []*EventReaderCfg{{ + ID: "test3", + Type: utils.MetaFileCSV, + ProcessedPath: "/", + SourcePath: "/", + FieldSep: "", + }} expected = " empty FieldSep for reader with ID: test3" if err := cfg.checkConfigSanity(); err == nil || err.Error() != expected { t.Errorf("Expecting: %+q received: %+q", expected, err) @@ -728,7 +714,7 @@ func TestConfigSanityDataDB(t *testing.T) { cfg.cacheCfg = &CacheCfg{ Partitions: map[string]*CacheParamCfg{ - utils.CacheTimings: &CacheParamCfg{ + utils.CacheTimings: { Limit: 0, }, }, @@ -738,7 +724,7 @@ func TestConfigSanityDataDB(t *testing.T) { } cfg.cacheCfg = &CacheCfg{ Partitions: map[string]*CacheParamCfg{ - utils.CacheAccounts: &CacheParamCfg{ + utils.CacheAccounts: { Limit: 1, }, }, @@ -771,7 +757,7 @@ func TestConfigSanityDataDB(t *testing.T) { cfg.thresholdSCfg.Enabled = false cfg.dataDbCfg.Items = map[string]*ItemOpt{ - "test1": &ItemOpt{ + "test1": { Remote: true, }, } @@ -781,7 +767,7 @@ func TestConfigSanityDataDB(t *testing.T) { } cfg.dataDbCfg.Items = map[string]*ItemOpt{ - "test2": &ItemOpt{ + "test2": { Remote: false, Replicate: true, }, @@ -838,12 +824,12 @@ func TestConfigSanityDispatcher(t *testing.T) { func TestConfigSanityCacheS(t *testing.T) { cfg, _ = NewDefaultCGRConfig() - cfg.cacheCfg.Partitions = map[string]*CacheParamCfg{"wrong_partition_name": &CacheParamCfg{Limit: 10}} + cfg.cacheCfg.Partitions = map[string]*CacheParamCfg{"wrong_partition_name": {Limit: 10}} if err := cfg.checkConfigSanity(); err == nil || err.Error() != " partition not defined" { t.Error(err) } - cfg.cacheCfg.Partitions = map[string]*CacheParamCfg{utils.CacheLoadIDs: &CacheParamCfg{Limit: 9}} + cfg.cacheCfg.Partitions = map[string]*CacheParamCfg{utils.CacheLoadIDs: {Limit: 9}} if err := cfg.checkConfigSanity(); err != nil { t.Error(err) } @@ -853,7 +839,7 @@ func TestConfigSanityFilterS(t *testing.T) { cfg, _ = NewDefaultCGRConfig() cfg.filterSCfg.StatSConns = []string{utils.MetaInternal} - if err := cfg.checkConfigSanity(); err == nil || err.Error() != " not enabled but requested by component." { + if err := cfg.checkConfigSanity(); err == nil || err.Error() != " not enabled but requested by component" { t.Error(err) } cfg.filterSCfg.StatSConns = []string{"test"} @@ -865,7 +851,7 @@ func TestConfigSanityFilterS(t *testing.T) { cfg.filterSCfg.ResourceSConns = []string{utils.MetaInternal} - if err := cfg.checkConfigSanity(); err == nil || err.Error() != " not enabled but requested by component." { + if err := cfg.checkConfigSanity(); err == nil || err.Error() != " not enabled but requested by component" { t.Error(err) } cfg.filterSCfg.ResourceSConns = []string{"test"} @@ -877,7 +863,7 @@ func TestConfigSanityFilterS(t *testing.T) { cfg.filterSCfg.ApierSConns = []string{utils.MetaInternal} - if err := cfg.checkConfigSanity(); err == nil || err.Error() != " not enabled but requested by component." { + if err := cfg.checkConfigSanity(); err == nil || err.Error() != " not enabled but requested by component" { t.Error(err) } cfg.filterSCfg.ApierSConns = []string{"test"} diff --git a/dispatcherh/dispatcherh.go b/dispatcherh/dispatcherh.go index 1a0f77804..53f9a014d 100644 --- a/dispatcherh/dispatcherh.go +++ b/dispatcherh/dispatcherh.go @@ -28,12 +28,12 @@ import ( ) // NewDispatcherHService constructs a DispatcherHService -func NewDispatcherHService(dm *engine.DataManager, - cfg *config.CGRConfig, fltrS *engine.FilterS, +func NewDispatcherHService(cfg *config.CGRConfig, connMgr *engine.ConnManager) (*DispatcherHostsService, error) { return &DispatcherHostsService{ cfg: cfg, connMgr: connMgr, + stop: make(chan struct{}), }, nil } @@ -42,6 +42,7 @@ func NewDispatcherHService(dm *engine.DataManager, type DispatcherHostsService struct { cfg *config.CGRConfig connMgr *engine.ConnManager + stop chan struct{} } // ListenAndServe will initialize the service @@ -52,6 +53,8 @@ func (dhS *DispatcherHostsService) ListenAndServe(exitChan chan bool) (err error return } select { + case <-dhS.stop: + return case e := <-exitChan: exitChan <- e // put back for the others listening for shutdown request return @@ -63,6 +66,8 @@ func (dhS *DispatcherHostsService) ListenAndServe(exitChan chan bool) (err error // Shutdown is called to shutdown the service func (dhS *DispatcherHostsService) Shutdown() error { utils.Logger.Info(fmt.Sprintf("<%s> service shutdown initialized", utils.DispatcherH)) + dhS.unregisterHosts() + close(dhS.stop) utils.Logger.Info(fmt.Sprintf("<%s> service shutdown complete", utils.DispatcherH)) return nil } @@ -89,7 +94,7 @@ func (dhS *DispatcherHostsService) registerHosts() (err error) { dh.Conns[0] = conn } var rply string - if err = dhS.connMgr.Call([]string{connID}, nil, utils.DispatcherHv1RegisterHosts, dHs, &rply); err != nil { + if err := dhS.connMgr.Call([]string{connID}, nil, utils.DispatcherHv1RegisterHosts, dHs, &rply); err != nil { utils.Logger.Warning(fmt.Sprintf("<%s> Unable to set the hosts to the conn with ID <%s> because : %s", utils.DispatcherH, connID, err)) continue @@ -101,3 +106,22 @@ func (dhS *DispatcherHostsService) registerHosts() (err error) { } return } + +func (dhS *DispatcherHostsService) unregisterHosts() { + var rply string + for _, connID := range dhS.cfg.DispatcherHCfg().DispatchersConns { + if err := dhS.connMgr.Call([]string{connID}, nil, utils.DispatcherHv1UnregisterHosts, dhS.cfg.DispatcherHCfg().HostIDs, &rply); err != nil { + utils.Logger.Warning(fmt.Sprintf("<%s> Unable to set the hosts to the conn with ID <%s> because : %s", + utils.DispatcherH, connID, err)) + continue + } else if rply != utils.OK { + utils.Logger.Warning(fmt.Sprintf("<%s> Unexpected reply recieved when setting the hosts: %s", + utils.DispatcherH, rply)) + continue + } + } +} + +func (dhS *DispatcherHostsService) Call(_ string, _, _ interface{}) error { + return utils.ErrNotImplemented +} diff --git a/dispatcherh/libdispatcherh.go b/dispatcherh/libdispatcherh.go index 76f87d7f0..fda89a1c1 100644 --- a/dispatcherh/libdispatcherh.go +++ b/dispatcherh/libdispatcherh.go @@ -55,36 +55,53 @@ func register(req *http.Request) (*json.RawMessage, error) { utils.DispatcherH, err)) return nil, err } - if sReq.Method != utils.DispatcherHv1RegisterHosts { + switch sReq.Method { + default: err = errors.New("rpc: can't find service " + sReq.Method) utils.Logger.Warning(fmt.Sprintf("<%s> Failed to register hosts because: %s", utils.DispatcherH, err)) return sReq.Id, err - } - var dHs []*engine.DispatcherHost - params := []interface{}{dHs} - if err = json.Unmarshal(*sReq.Params, ¶ms); err != nil { - utils.Logger.Warning(fmt.Sprintf("<%s> Failed to decode params because: %s", - utils.DispatcherH, err)) - return sReq.Id, err - } - var addr string - if addr, err = getIP(req); err != nil { - utils.Logger.Warning(fmt.Sprintf("<%s> Failed to obtain the remote IP because: %s", - utils.DispatcherH, err)) - return sReq.Id, err - } - - for _, dH := range dHs { - if len(dH.Conns) != 1 { // ignore the hosts with no connections or more - continue + case utils.DispatcherHv1UnregisterHosts: + var dHIDs []string + params := []interface{}{dHIDs} + if err = json.Unmarshal(*sReq.Params, ¶ms); err != nil { + utils.Logger.Warning(fmt.Sprintf("<%s> Failed to decode params because: %s", + utils.DispatcherH, err)) + return sReq.Id, err } - dH.Conns[0].Address = addr + dH.Conns[0].Address // the address contains the port - if err = engine.Cache.Set(utils.CacheDispatcherHosts, dH.Tenant, dH, nil, - false, utils.NonTransactional); err != nil { - utils.Logger.Warning(fmt.Sprintf("<%s> Failed to set DispatcherHost <%s> in cache because: %s", - utils.DispatcherH, dH.TenantID(), err)) - continue + for _, id := range dHIDs { + if err = engine.Cache.Remove(utils.CacheDispatcherHosts, id, false, utils.NonTransactional); err != nil { + utils.Logger.Warning(fmt.Sprintf("<%s> Failed to remove DispatcherHost <%s> from cache because: %s", + utils.DispatcherH, id, err)) + continue + } + } + case utils.DispatcherHv1RegisterHosts: + var dHs []*engine.DispatcherHost + params := []interface{}{dHs} + if err = json.Unmarshal(*sReq.Params, ¶ms); err != nil { + utils.Logger.Warning(fmt.Sprintf("<%s> Failed to decode params because: %s", + utils.DispatcherH, err)) + return sReq.Id, err + } + var addr string + if addr, err = getIP(req); err != nil { + utils.Logger.Warning(fmt.Sprintf("<%s> Failed to obtain the remote IP because: %s", + utils.DispatcherH, err)) + return sReq.Id, err + } + + for _, dH := range dHs { + if len(dH.Conns) != 1 { // ignore the hosts with no connections or more + continue + } + dH.Conns[0].Address = addr + dH.Conns[0].Address // the address contains the port + if err = engine.Cache.Set(utils.CacheDispatcherHosts, dH.Tenant, dH, nil, + false, utils.NonTransactional); err != nil { + utils.Logger.Warning(fmt.Sprintf("<%s> Failed to set DispatcherHost <%s> in cache because: %s", + utils.DispatcherH, dH.TenantID(), err)) + continue + } } } return sReq.Id, nil diff --git a/services/dispatcherh.go b/services/dispatcherh.go new file mode 100644 index 000000000..0dce19156 --- /dev/null +++ b/services/dispatcherh.go @@ -0,0 +1,108 @@ +/* +Real-time Online/Offline Charging System (OCS) for Telecom & ISP environments +Copyright (C) ITsysCOM GmbH + +This program is free software: you can redistribute it and/or modify +it under the terms of the GNU General Public License as published by +the Free Software Foundation, either version 3 of the License, or +(at your option) any later version. + +This program is distributed in the hope that it will be useful, +but WITHOUT ANY WARRANTY; without even the implied warranty of +MERCHANTABILITY or FITNESS FOR A PARTICULAR PURPOSE. See the +GNU General Public License for more details. + +You should have received a copy of the GNU General Public License +along with this program. If not, see +*/ + +package services + +import ( + "fmt" + "sync" + + "github.com/cgrates/cgrates/config" + "github.com/cgrates/cgrates/dispatcherh" + "github.com/cgrates/cgrates/engine" + "github.com/cgrates/cgrates/servmanager" + "github.com/cgrates/cgrates/utils" + "github.com/cgrates/rpcclient" +) + +// NewDispatcherHostsService returns the Dispatcher Service +func NewDispatcherHostsService(cfg *config.CGRConfig, server *utils.Server, + internalChan chan rpcclient.ClientConnector, connMgr *engine.ConnManager) servmanager.Service { + return &DispatcherHostsService{ + connChan: internalChan, + cfg: cfg, + server: server, + connMgr: connMgr, + } +} + +// DispatcherHostsService implements Service interface +type DispatcherHostsService struct { + sync.RWMutex + cfg *config.CGRConfig + server *utils.Server + connMgr *engine.ConnManager + + dspS *dispatcherh.DispatcherHostsService + // rpc *v1.DispatcherHSv1 + connChan chan rpcclient.ClientConnector +} + +// Start should handle the sercive start +func (dspS *DispatcherHostsService) Start() (err error) { + if dspS.IsRunning() { + return utils.ErrServiceAlreadyRunning + } + utils.Logger.Info("Starting CGRateS Dispatcher service.") + dspS.Lock() + defer dspS.Unlock() + + if dspS.dspS, err = dispatcherh.NewDispatcherHService(dspS.cfg, dspS.connMgr); err != nil { + utils.Logger.Crit(fmt.Sprintf("<%s> Could not init, error: %s", utils.DispatcherH, err.Error())) + return + } + + dspS.connChan <- dspS.dspS + + return +} + +// Reload handles the change of config +func (dspS *DispatcherHostsService) Reload() (err error) { + return // for the momment nothing to reload +} + +// Shutdown stops the service +func (dspS *DispatcherHostsService) Shutdown() (err error) { + dspS.Lock() + defer dspS.Unlock() + if err = dspS.dspS.Shutdown(); err != nil { + return + } + dspS.dspS = nil + // dspS.rpc = nil + <-dspS.connChan + return +} + +// IsRunning returns if the service is running +func (dspS *DispatcherHostsService) IsRunning() bool { + dspS.RLock() + defer dspS.RUnlock() + return dspS != nil && dspS.dspS != nil +} + +// ServiceName returns the service name +func (dspS *DispatcherHostsService) ServiceName() string { + return utils.DispatcherH +} + +// ShouldRun returns if the service should be running +func (dspS *DispatcherHostsService) ShouldRun() bool { + return dspS.cfg.DispatcherHCfg().Enabled +} diff --git a/servmanager/servmanager.go b/servmanager/servmanager.go index 2060d3a0c..4215d4712 100644 --- a/servmanager/servmanager.go +++ b/servmanager/servmanager.go @@ -305,6 +305,10 @@ func (srvMngr *ServiceManager) handleReload() { if err = srvMngr.reloadService(utils.SIPAgent); err != nil { return } + case <-srvMngr.GetConfig().GetReloadChan(config.DispatcherHJson): + if err = srvMngr.reloadService(utils.DispatcherH); err != nil { + return + } } // handle RPC server } diff --git a/utils/consts.go b/utils/consts.go index 559b7df93..93b265fe6 100755 --- a/utils/consts.go +++ b/utils/consts.go @@ -449,6 +449,7 @@ const ( MetaBlockerError = "*blocker_error" MetaConfig = "*config" MetaDispatchers = "*dispatchers" + MetaDispatcherh = "*dispatcherh" MetaDispatcherHosts = "*dispatcher_hosts" MetaFilters = "*filters" MetaCDRs = "*cdrs" @@ -1537,6 +1538,7 @@ const ( // DispatcherS APIs const ( + DispatcherSv1 = "DispatcherSv1" DispatcherSv1Ping = "DispatcherSv1.Ping" DispatcherSv1GetProfileForEvent = "DispatcherSv1.GetProfileForEvent" DispatcherSv1Apier = "DispatcherSv1.Apier" @@ -1545,7 +1547,8 @@ const ( // DispatcherH APIs const ( - DispatcherHv1RegisterHosts = "DispatcherHv1.RegisterHosts" + DispatcherHv1RegisterHosts = "DispatcherHv1.RegisterHosts" + DispatcherHv1UnregisterHosts = "DispatcherHv1.UnregisterHosts" ) // RateProfile APIs