mirror of
https://github.com/cgrates/cgrates.git
synced 2026-02-11 10:06:24 +05:00
Added DispatcherH as a service
This commit is contained in:
committed by
Dan Christian Bogos
parent
4ab8185941
commit
ef84af834c
@@ -114,7 +114,7 @@ func initServiceManagerV1(internalServiceManagerChan chan rpcclient.ClientConnec
|
||||
internalServiceManagerChan <- srvMngr
|
||||
}
|
||||
|
||||
func startRpc(server *utils.Server, internalRaterChan,
|
||||
func startRPC(server *utils.Server, internalRaterChan,
|
||||
internalCdrSChan, internalRsChan, internalStatSChan,
|
||||
internalAttrSChan, internalChargerSChan, internalThdSChan, internalSuplSChan,
|
||||
internalSMGChan, internalAnalyzerSChan, internalDispatcherSChan,
|
||||
@@ -439,6 +439,7 @@ func main() {
|
||||
internalCDRServerChan := make(chan rpcclient.ClientConnector, 1)
|
||||
internalAttributeSChan := make(chan rpcclient.ClientConnector, 1)
|
||||
internalDispatcherSChan := make(chan rpcclient.ClientConnector, 1)
|
||||
internalDispatcherHChan := make(chan rpcclient.ClientConnector, 1)
|
||||
internalSessionSChan := make(chan rpcclient.ClientConnector, 1)
|
||||
internalChargerSChan := make(chan rpcclient.ClientConnector, 1)
|
||||
internalThresholdSChan := make(chan rpcclient.ClientConnector, 1)
|
||||
@@ -480,6 +481,7 @@ func main() {
|
||||
utils.ConcatenatedKey(utils.MetaInternal, utils.MetaEEs): internalEEsChan,
|
||||
utils.ConcatenatedKey(utils.MetaInternal, utils.MetaRateS): internalRateSChan,
|
||||
utils.ConcatenatedKey(utils.MetaInternal, utils.MetaDispatchers): internalDispatcherSChan,
|
||||
utils.ConcatenatedKey(utils.MetaInternal, utils.MetaDispatcherh): internalDispatcherHChan,
|
||||
})
|
||||
|
||||
dmService := services.NewDataDBService(cfg, connManager)
|
||||
@@ -525,6 +527,7 @@ func main() {
|
||||
srvManager := servmanager.NewServiceManager(cfg, exitChan)
|
||||
attrS := services.NewAttributeService(cfg, dmService, cacheS, filterSChan, server, internalAttributeSChan)
|
||||
dspS := services.NewDispatcherService(cfg, dmService, cacheS, filterSChan, server, internalDispatcherSChan, connManager)
|
||||
dspH := services.NewDispatcherHostsService(cfg, server, internalDispatcherSChan, connManager)
|
||||
chrS := services.NewChargerService(cfg, dmService, cacheS, filterSChan, server,
|
||||
internalChargerSChan, connManager)
|
||||
tS := services.NewThresholdService(cfg, dmService, cacheS, filterSChan, server, internalThresholdSChan)
|
||||
@@ -567,7 +570,7 @@ func main() {
|
||||
services.NewRadiusAgent(cfg, filterSChan, exitChan, connManager), // partial reload
|
||||
services.NewDiameterAgent(cfg, filterSChan, exitChan, connManager), // partial reload
|
||||
services.NewHTTPAgent(cfg, filterSChan, server, connManager), // no reload
|
||||
ldrs, anz, dspS, dmService, storDBService,
|
||||
ldrs, anz, dspS, dspH, dmService, storDBService,
|
||||
services.NewEventExporterService(cfg, filterSChan,
|
||||
connManager, server, exitChan, internalEEsChan),
|
||||
services.NewRateService(cfg, cacheS, filterSChan, dmService,
|
||||
@@ -605,6 +608,9 @@ func main() {
|
||||
engine.IntRPC.AddInternalRPCClient(utils.CoreSv1, internalCoreSv1Chan)
|
||||
engine.IntRPC.AddInternalRPCClient(utils.RALsV1, internalRALsChan)
|
||||
engine.IntRPC.AddInternalRPCClient(utils.RateSv1, internalRateSChan)
|
||||
engine.IntRPC.AddInternalRPCClient(utils.EventExporterSv1, internalEEsChan)
|
||||
engine.IntRPC.AddInternalRPCClient(utils.DispatcherSv1, internalDispatcherSChan)
|
||||
// engine.IntRPC.AddInternalRPCClient(utils.DispatcherHv1, internalDispatcherHChan)
|
||||
|
||||
initConfigSv1(internalConfigChan, server)
|
||||
|
||||
@@ -613,7 +619,7 @@ func main() {
|
||||
}
|
||||
|
||||
// Serve rpc connections
|
||||
go startRpc(server, internalResponderChan, internalCDRServerChan,
|
||||
go startRPC(server, internalResponderChan, internalCDRServerChan,
|
||||
internalResourceSChan, internalStatSChan,
|
||||
internalAttributeSChan, internalChargerSChan, internalThresholdSChan,
|
||||
internalRouteSChan, internalSessionSChan, internalAnalyzerSChan,
|
||||
|
||||
@@ -1521,6 +1521,8 @@ func (cfg *CGRConfig) reloadSections(sections ...string) (err error) {
|
||||
cfg.rldChans[SIPAgentJson] <- struct{}{}
|
||||
case RateSJson:
|
||||
cfg.rldChans[RateSJson] <- struct{}{}
|
||||
case DispatcherHJson:
|
||||
cfg.rldChans[DispatcherHJson] <- struct{}{}
|
||||
}
|
||||
return
|
||||
}
|
||||
|
||||
@@ -69,7 +69,7 @@ var (
|
||||
CACHE_JSN, FilterSjsn, RALS_JSN, CDRS_JSN, ERsJson, SessionSJson, AsteriskAgentJSN, FreeSWITCHAgentJSN,
|
||||
KamailioAgentJSN, DA_JSN, RA_JSN, HttpAgentJson, DNSAgentJson, ATTRIBUTE_JSN, ChargerSCfgJson, RESOURCES_JSON, STATS_JSON,
|
||||
THRESHOLDS_JSON, RouteSJson, LoaderJson, MAILER_JSN, SURETAX_JSON, CgrLoaderCfgJson, CgrMigratorCfgJson, DispatcherSJson,
|
||||
AnalyzerCfgJson, ApierS, EEsJson, RateSJson, SIPAgentJson}
|
||||
AnalyzerCfgJson, ApierS, EEsJson, RateSJson, SIPAgentJson, DispatcherHJson}
|
||||
)
|
||||
|
||||
// Loads the json config out of io.Reader, eg other sources than file, maybe over http
|
||||
|
||||
@@ -24,6 +24,7 @@ import (
|
||||
"strings"
|
||||
|
||||
"github.com/cgrates/cgrates/utils"
|
||||
"github.com/cgrates/rpcclient"
|
||||
)
|
||||
|
||||
// Exported in cgr-engine
|
||||
@@ -669,7 +670,7 @@ func (cfg *CGRConfig) checkConfigSanity() error {
|
||||
// FilterS sanity check
|
||||
for _, connID := range cfg.filterSCfg.StatSConns {
|
||||
if strings.HasPrefix(connID, utils.MetaInternal) && !cfg.statsCfg.Enabled {
|
||||
return fmt.Errorf("<%s> not enabled but requested by <%s> component.", utils.StatS, utils.FilterS)
|
||||
return fmt.Errorf("<%s> not enabled but requested by <%s> component", utils.StatS, utils.FilterS)
|
||||
}
|
||||
if _, has := cfg.rpcConns[connID]; !has && !strings.HasPrefix(connID, utils.MetaInternal) {
|
||||
return fmt.Errorf("<%s> connection with id: <%s> not defined", utils.FilterS, connID)
|
||||
@@ -677,7 +678,7 @@ func (cfg *CGRConfig) checkConfigSanity() error {
|
||||
}
|
||||
for _, connID := range cfg.filterSCfg.ResourceSConns {
|
||||
if strings.HasPrefix(connID, utils.MetaInternal) && !cfg.resourceSCfg.Enabled {
|
||||
return fmt.Errorf("<%s> not enabled but requested by <%s> component.", utils.ResourceS, utils.FilterS)
|
||||
return fmt.Errorf("<%s> not enabled but requested by <%s> component", utils.ResourceS, utils.FilterS)
|
||||
}
|
||||
if _, has := cfg.rpcConns[connID]; !has && !strings.HasPrefix(connID, utils.MetaInternal) {
|
||||
return fmt.Errorf("<%s> connection with id: <%s> not defined", utils.FilterS, connID)
|
||||
@@ -685,12 +686,47 @@ func (cfg *CGRConfig) checkConfigSanity() error {
|
||||
}
|
||||
for _, connID := range cfg.filterSCfg.ApierSConns {
|
||||
if strings.HasPrefix(connID, utils.MetaInternal) && !cfg.apier.Enabled {
|
||||
return fmt.Errorf("<%s> not enabled but requested by <%s> component.", utils.ApierS, utils.FilterS)
|
||||
return fmt.Errorf("<%s> not enabled but requested by <%s> component", utils.ApierS, utils.FilterS)
|
||||
}
|
||||
if _, has := cfg.rpcConns[connID]; !has && !strings.HasPrefix(connID, utils.MetaInternal) {
|
||||
return fmt.Errorf("<%s> connection with id: <%s> not defined", utils.FilterS, connID)
|
||||
}
|
||||
}
|
||||
|
||||
if cfg.dispatcherHCfg.Enabled {
|
||||
if len(cfg.dispatcherHCfg.HostIDs) == 0 {
|
||||
return fmt.Errorf("<%s> missing dispatcher host IDs", utils.DispatcherH)
|
||||
}
|
||||
if cfg.dispatcherHCfg.RegisterInterval <= 0 {
|
||||
return fmt.Errorf("<%s> the register imterval needs to be bigger than 0", utils.DispatcherH)
|
||||
}
|
||||
if !utils.SliceHasMember([]string{utils.MetaGOB, rpcclient.HTTPjson, utils.MetaJSON}, cfg.dispatcherHCfg.RegisterTransport) {
|
||||
return fmt.Errorf("<%s> unsupported transport: <%s>", utils.DispatcherH, cfg.dispatcherHCfg.RegisterTransport)
|
||||
}
|
||||
if len(cfg.dispatcherHCfg.DispatchersConns) == 0 {
|
||||
return fmt.Errorf("<%s> missing dispatcher connection IDs", utils.DispatcherH)
|
||||
}
|
||||
for _, connID := range cfg.dispatcherHCfg.DispatchersConns {
|
||||
if connID == utils.MetaInternal {
|
||||
return fmt.Errorf("<%s> internal connection IDs are not supported", utils.DispatcherH)
|
||||
}
|
||||
connCfg, has := cfg.rpcConns[connID]
|
||||
if !has {
|
||||
return fmt.Errorf("<%s> connection with id: <%s> not defined", utils.DispatcherH, connID)
|
||||
}
|
||||
if len(connCfg.Conns) != 0 {
|
||||
return fmt.Errorf("<%s> connection with id: <%s> needs to have only one host", utils.DispatcherH, connID)
|
||||
}
|
||||
if connCfg.Conns[0].Transport != rpcclient.HTTPjson {
|
||||
return fmt.Errorf("<%s> connection with id: <%s> unsupported transport <%s>", utils.DispatcherH, connID, connCfg.Conns[0].Transport)
|
||||
}
|
||||
}
|
||||
|
||||
/*
|
||||
DispatchersConns []string
|
||||
RegisterTransport string
|
||||
*/
|
||||
}
|
||||
|
||||
return nil
|
||||
}
|
||||
|
||||
@@ -132,11 +132,9 @@ func TestConfigSanityLoaders(t *testing.T) {
|
||||
&LoaderSCfg{
|
||||
Enabled: true,
|
||||
TpInDir: "/not/exist",
|
||||
Data: []*LoaderDataType{
|
||||
&LoaderDataType{
|
||||
Type: "strsdfing",
|
||||
},
|
||||
},
|
||||
Data: []*LoaderDataType{{
|
||||
Type: "strsdfing",
|
||||
}},
|
||||
},
|
||||
}
|
||||
expected := "<LoaderS> nonexistent folder: /not/exist"
|
||||
@@ -149,11 +147,9 @@ func TestConfigSanityLoaders(t *testing.T) {
|
||||
Enabled: true,
|
||||
TpInDir: "/",
|
||||
TpOutDir: "/",
|
||||
Data: []*LoaderDataType{
|
||||
&LoaderDataType{
|
||||
Type: "wrongtype",
|
||||
},
|
||||
},
|
||||
Data: []*LoaderDataType{{
|
||||
Type: "wrongtype",
|
||||
}},
|
||||
},
|
||||
}
|
||||
expected = "<LoaderS> unsupported data type wrongtype"
|
||||
@@ -166,17 +162,13 @@ func TestConfigSanityLoaders(t *testing.T) {
|
||||
Enabled: true,
|
||||
TpInDir: "/",
|
||||
TpOutDir: "/",
|
||||
Data: []*LoaderDataType{
|
||||
&LoaderDataType{
|
||||
Data: []*LoaderDataType{{
|
||||
Type: utils.MetaStats,
|
||||
Fields: []*FCTemplate{{
|
||||
Type: utils.MetaStats,
|
||||
Fields: []*FCTemplate{
|
||||
&FCTemplate{
|
||||
Type: utils.MetaStats,
|
||||
Tag: "test1",
|
||||
},
|
||||
},
|
||||
},
|
||||
},
|
||||
Tag: "test1",
|
||||
}},
|
||||
}},
|
||||
},
|
||||
}
|
||||
expected = "<LoaderS> invalid field type *stats for *stats at test1"
|
||||
@@ -637,38 +629,32 @@ func TestConfigSanityEventReader(t *testing.T) {
|
||||
t.Errorf("Expecting: %+q received: %+q", expected, err)
|
||||
}
|
||||
cfg.sessionSCfg.Enabled = true
|
||||
cfg.ersCfg.Readers = []*EventReaderCfg{
|
||||
&EventReaderCfg{
|
||||
ID: "test",
|
||||
Type: "wrongtype",
|
||||
},
|
||||
}
|
||||
cfg.ersCfg.Readers = []*EventReaderCfg{{
|
||||
ID: "test",
|
||||
Type: "wrongtype",
|
||||
}}
|
||||
expected = "<ERs> unsupported data type: wrongtype for reader with ID: test"
|
||||
if err := cfg.checkConfigSanity(); err == nil || err.Error() != expected {
|
||||
t.Errorf("Expecting: %+q received: %+q", expected, err)
|
||||
}
|
||||
|
||||
cfg.ersCfg.Readers = []*EventReaderCfg{
|
||||
&EventReaderCfg{
|
||||
ID: "test2",
|
||||
Type: utils.MetaFileCSV,
|
||||
ProcessedPath: "not/a/path",
|
||||
},
|
||||
}
|
||||
cfg.ersCfg.Readers = []*EventReaderCfg{{
|
||||
ID: "test2",
|
||||
Type: utils.MetaFileCSV,
|
||||
ProcessedPath: "not/a/path",
|
||||
}}
|
||||
expected = "<ERs> nonexistent folder: not/a/path for reader with ID: test2"
|
||||
if err := cfg.checkConfigSanity(); err == nil || err.Error() != expected {
|
||||
t.Errorf("Expecting: %+q received: %+q", expected, err)
|
||||
}
|
||||
|
||||
cfg.ersCfg.Readers = []*EventReaderCfg{
|
||||
&EventReaderCfg{
|
||||
ID: "test3",
|
||||
Type: utils.MetaFileCSV,
|
||||
ProcessedPath: "/",
|
||||
SourcePath: "/",
|
||||
FieldSep: "",
|
||||
},
|
||||
}
|
||||
cfg.ersCfg.Readers = []*EventReaderCfg{{
|
||||
ID: "test3",
|
||||
Type: utils.MetaFileCSV,
|
||||
ProcessedPath: "/",
|
||||
SourcePath: "/",
|
||||
FieldSep: "",
|
||||
}}
|
||||
expected = "<ERs> empty FieldSep for reader with ID: test3"
|
||||
if err := cfg.checkConfigSanity(); err == nil || err.Error() != expected {
|
||||
t.Errorf("Expecting: %+q received: %+q", expected, err)
|
||||
@@ -728,7 +714,7 @@ func TestConfigSanityDataDB(t *testing.T) {
|
||||
|
||||
cfg.cacheCfg = &CacheCfg{
|
||||
Partitions: map[string]*CacheParamCfg{
|
||||
utils.CacheTimings: &CacheParamCfg{
|
||||
utils.CacheTimings: {
|
||||
Limit: 0,
|
||||
},
|
||||
},
|
||||
@@ -738,7 +724,7 @@ func TestConfigSanityDataDB(t *testing.T) {
|
||||
}
|
||||
cfg.cacheCfg = &CacheCfg{
|
||||
Partitions: map[string]*CacheParamCfg{
|
||||
utils.CacheAccounts: &CacheParamCfg{
|
||||
utils.CacheAccounts: {
|
||||
Limit: 1,
|
||||
},
|
||||
},
|
||||
@@ -771,7 +757,7 @@ func TestConfigSanityDataDB(t *testing.T) {
|
||||
cfg.thresholdSCfg.Enabled = false
|
||||
|
||||
cfg.dataDbCfg.Items = map[string]*ItemOpt{
|
||||
"test1": &ItemOpt{
|
||||
"test1": {
|
||||
Remote: true,
|
||||
},
|
||||
}
|
||||
@@ -781,7 +767,7 @@ func TestConfigSanityDataDB(t *testing.T) {
|
||||
}
|
||||
|
||||
cfg.dataDbCfg.Items = map[string]*ItemOpt{
|
||||
"test2": &ItemOpt{
|
||||
"test2": {
|
||||
Remote: false,
|
||||
Replicate: true,
|
||||
},
|
||||
@@ -838,12 +824,12 @@ func TestConfigSanityDispatcher(t *testing.T) {
|
||||
func TestConfigSanityCacheS(t *testing.T) {
|
||||
cfg, _ = NewDefaultCGRConfig()
|
||||
|
||||
cfg.cacheCfg.Partitions = map[string]*CacheParamCfg{"wrong_partition_name": &CacheParamCfg{Limit: 10}}
|
||||
cfg.cacheCfg.Partitions = map[string]*CacheParamCfg{"wrong_partition_name": {Limit: 10}}
|
||||
if err := cfg.checkConfigSanity(); err == nil || err.Error() != "<CacheS> partition <wrong_partition_name> not defined" {
|
||||
t.Error(err)
|
||||
}
|
||||
|
||||
cfg.cacheCfg.Partitions = map[string]*CacheParamCfg{utils.CacheLoadIDs: &CacheParamCfg{Limit: 9}}
|
||||
cfg.cacheCfg.Partitions = map[string]*CacheParamCfg{utils.CacheLoadIDs: {Limit: 9}}
|
||||
if err := cfg.checkConfigSanity(); err != nil {
|
||||
t.Error(err)
|
||||
}
|
||||
@@ -853,7 +839,7 @@ func TestConfigSanityFilterS(t *testing.T) {
|
||||
cfg, _ = NewDefaultCGRConfig()
|
||||
cfg.filterSCfg.StatSConns = []string{utils.MetaInternal}
|
||||
|
||||
if err := cfg.checkConfigSanity(); err == nil || err.Error() != "<Stats> not enabled but requested by <FilterS> component." {
|
||||
if err := cfg.checkConfigSanity(); err == nil || err.Error() != "<Stats> not enabled but requested by <FilterS> component" {
|
||||
t.Error(err)
|
||||
}
|
||||
cfg.filterSCfg.StatSConns = []string{"test"}
|
||||
@@ -865,7 +851,7 @@ func TestConfigSanityFilterS(t *testing.T) {
|
||||
|
||||
cfg.filterSCfg.ResourceSConns = []string{utils.MetaInternal}
|
||||
|
||||
if err := cfg.checkConfigSanity(); err == nil || err.Error() != "<ResourceS> not enabled but requested by <FilterS> component." {
|
||||
if err := cfg.checkConfigSanity(); err == nil || err.Error() != "<ResourceS> not enabled but requested by <FilterS> component" {
|
||||
t.Error(err)
|
||||
}
|
||||
cfg.filterSCfg.ResourceSConns = []string{"test"}
|
||||
@@ -877,7 +863,7 @@ func TestConfigSanityFilterS(t *testing.T) {
|
||||
|
||||
cfg.filterSCfg.ApierSConns = []string{utils.MetaInternal}
|
||||
|
||||
if err := cfg.checkConfigSanity(); err == nil || err.Error() != "<ApierS> not enabled but requested by <FilterS> component." {
|
||||
if err := cfg.checkConfigSanity(); err == nil || err.Error() != "<ApierS> not enabled but requested by <FilterS> component" {
|
||||
t.Error(err)
|
||||
}
|
||||
cfg.filterSCfg.ApierSConns = []string{"test"}
|
||||
|
||||
@@ -28,12 +28,12 @@ import (
|
||||
)
|
||||
|
||||
// NewDispatcherHService constructs a DispatcherHService
|
||||
func NewDispatcherHService(dm *engine.DataManager,
|
||||
cfg *config.CGRConfig, fltrS *engine.FilterS,
|
||||
func NewDispatcherHService(cfg *config.CGRConfig,
|
||||
connMgr *engine.ConnManager) (*DispatcherHostsService, error) {
|
||||
return &DispatcherHostsService{
|
||||
cfg: cfg,
|
||||
connMgr: connMgr,
|
||||
stop: make(chan struct{}),
|
||||
}, nil
|
||||
}
|
||||
|
||||
@@ -42,6 +42,7 @@ func NewDispatcherHService(dm *engine.DataManager,
|
||||
type DispatcherHostsService struct {
|
||||
cfg *config.CGRConfig
|
||||
connMgr *engine.ConnManager
|
||||
stop chan struct{}
|
||||
}
|
||||
|
||||
// ListenAndServe will initialize the service
|
||||
@@ -52,6 +53,8 @@ func (dhS *DispatcherHostsService) ListenAndServe(exitChan chan bool) (err error
|
||||
return
|
||||
}
|
||||
select {
|
||||
case <-dhS.stop:
|
||||
return
|
||||
case e := <-exitChan:
|
||||
exitChan <- e // put back for the others listening for shutdown request
|
||||
return
|
||||
@@ -63,6 +66,8 @@ func (dhS *DispatcherHostsService) ListenAndServe(exitChan chan bool) (err error
|
||||
// Shutdown is called to shutdown the service
|
||||
func (dhS *DispatcherHostsService) Shutdown() error {
|
||||
utils.Logger.Info(fmt.Sprintf("<%s> service shutdown initialized", utils.DispatcherH))
|
||||
dhS.unregisterHosts()
|
||||
close(dhS.stop)
|
||||
utils.Logger.Info(fmt.Sprintf("<%s> service shutdown complete", utils.DispatcherH))
|
||||
return nil
|
||||
}
|
||||
@@ -89,7 +94,7 @@ func (dhS *DispatcherHostsService) registerHosts() (err error) {
|
||||
dh.Conns[0] = conn
|
||||
}
|
||||
var rply string
|
||||
if err = dhS.connMgr.Call([]string{connID}, nil, utils.DispatcherHv1RegisterHosts, dHs, &rply); err != nil {
|
||||
if err := dhS.connMgr.Call([]string{connID}, nil, utils.DispatcherHv1RegisterHosts, dHs, &rply); err != nil {
|
||||
utils.Logger.Warning(fmt.Sprintf("<%s> Unable to set the hosts to the conn with ID <%s> because : %s",
|
||||
utils.DispatcherH, connID, err))
|
||||
continue
|
||||
@@ -101,3 +106,22 @@ func (dhS *DispatcherHostsService) registerHosts() (err error) {
|
||||
}
|
||||
return
|
||||
}
|
||||
|
||||
func (dhS *DispatcherHostsService) unregisterHosts() {
|
||||
var rply string
|
||||
for _, connID := range dhS.cfg.DispatcherHCfg().DispatchersConns {
|
||||
if err := dhS.connMgr.Call([]string{connID}, nil, utils.DispatcherHv1UnregisterHosts, dhS.cfg.DispatcherHCfg().HostIDs, &rply); err != nil {
|
||||
utils.Logger.Warning(fmt.Sprintf("<%s> Unable to set the hosts to the conn with ID <%s> because : %s",
|
||||
utils.DispatcherH, connID, err))
|
||||
continue
|
||||
} else if rply != utils.OK {
|
||||
utils.Logger.Warning(fmt.Sprintf("<%s> Unexpected reply recieved when setting the hosts: %s",
|
||||
utils.DispatcherH, rply))
|
||||
continue
|
||||
}
|
||||
}
|
||||
}
|
||||
|
||||
func (dhS *DispatcherHostsService) Call(_ string, _, _ interface{}) error {
|
||||
return utils.ErrNotImplemented
|
||||
}
|
||||
|
||||
@@ -55,36 +55,53 @@ func register(req *http.Request) (*json.RawMessage, error) {
|
||||
utils.DispatcherH, err))
|
||||
return nil, err
|
||||
}
|
||||
if sReq.Method != utils.DispatcherHv1RegisterHosts {
|
||||
switch sReq.Method {
|
||||
default:
|
||||
err = errors.New("rpc: can't find service " + sReq.Method)
|
||||
utils.Logger.Warning(fmt.Sprintf("<%s> Failed to register hosts because: %s",
|
||||
utils.DispatcherH, err))
|
||||
return sReq.Id, err
|
||||
}
|
||||
var dHs []*engine.DispatcherHost
|
||||
params := []interface{}{dHs}
|
||||
if err = json.Unmarshal(*sReq.Params, ¶ms); err != nil {
|
||||
utils.Logger.Warning(fmt.Sprintf("<%s> Failed to decode params because: %s",
|
||||
utils.DispatcherH, err))
|
||||
return sReq.Id, err
|
||||
}
|
||||
var addr string
|
||||
if addr, err = getIP(req); err != nil {
|
||||
utils.Logger.Warning(fmt.Sprintf("<%s> Failed to obtain the remote IP because: %s",
|
||||
utils.DispatcherH, err))
|
||||
return sReq.Id, err
|
||||
}
|
||||
|
||||
for _, dH := range dHs {
|
||||
if len(dH.Conns) != 1 { // ignore the hosts with no connections or more
|
||||
continue
|
||||
case utils.DispatcherHv1UnregisterHosts:
|
||||
var dHIDs []string
|
||||
params := []interface{}{dHIDs}
|
||||
if err = json.Unmarshal(*sReq.Params, ¶ms); err != nil {
|
||||
utils.Logger.Warning(fmt.Sprintf("<%s> Failed to decode params because: %s",
|
||||
utils.DispatcherH, err))
|
||||
return sReq.Id, err
|
||||
}
|
||||
dH.Conns[0].Address = addr + dH.Conns[0].Address // the address contains the port
|
||||
if err = engine.Cache.Set(utils.CacheDispatcherHosts, dH.Tenant, dH, nil,
|
||||
false, utils.NonTransactional); err != nil {
|
||||
utils.Logger.Warning(fmt.Sprintf("<%s> Failed to set DispatcherHost <%s> in cache because: %s",
|
||||
utils.DispatcherH, dH.TenantID(), err))
|
||||
continue
|
||||
for _, id := range dHIDs {
|
||||
if err = engine.Cache.Remove(utils.CacheDispatcherHosts, id, false, utils.NonTransactional); err != nil {
|
||||
utils.Logger.Warning(fmt.Sprintf("<%s> Failed to remove DispatcherHost <%s> from cache because: %s",
|
||||
utils.DispatcherH, id, err))
|
||||
continue
|
||||
}
|
||||
}
|
||||
case utils.DispatcherHv1RegisterHosts:
|
||||
var dHs []*engine.DispatcherHost
|
||||
params := []interface{}{dHs}
|
||||
if err = json.Unmarshal(*sReq.Params, ¶ms); err != nil {
|
||||
utils.Logger.Warning(fmt.Sprintf("<%s> Failed to decode params because: %s",
|
||||
utils.DispatcherH, err))
|
||||
return sReq.Id, err
|
||||
}
|
||||
var addr string
|
||||
if addr, err = getIP(req); err != nil {
|
||||
utils.Logger.Warning(fmt.Sprintf("<%s> Failed to obtain the remote IP because: %s",
|
||||
utils.DispatcherH, err))
|
||||
return sReq.Id, err
|
||||
}
|
||||
|
||||
for _, dH := range dHs {
|
||||
if len(dH.Conns) != 1 { // ignore the hosts with no connections or more
|
||||
continue
|
||||
}
|
||||
dH.Conns[0].Address = addr + dH.Conns[0].Address // the address contains the port
|
||||
if err = engine.Cache.Set(utils.CacheDispatcherHosts, dH.Tenant, dH, nil,
|
||||
false, utils.NonTransactional); err != nil {
|
||||
utils.Logger.Warning(fmt.Sprintf("<%s> Failed to set DispatcherHost <%s> in cache because: %s",
|
||||
utils.DispatcherH, dH.TenantID(), err))
|
||||
continue
|
||||
}
|
||||
}
|
||||
}
|
||||
return sReq.Id, nil
|
||||
|
||||
108
services/dispatcherh.go
Normal file
108
services/dispatcherh.go
Normal file
@@ -0,0 +1,108 @@
|
||||
/*
|
||||
Real-time Online/Offline Charging System (OCS) for Telecom & ISP environments
|
||||
Copyright (C) ITsysCOM GmbH
|
||||
|
||||
This program is free software: you can redistribute it and/or modify
|
||||
it under the terms of the GNU General Public License as published by
|
||||
the Free Software Foundation, either version 3 of the License, or
|
||||
(at your option) any later version.
|
||||
|
||||
This program is distributed in the hope that it will be useful,
|
||||
but WITHOUT ANY WARRANTY; without even the implied warranty of
|
||||
MERCHANTABILITY or FITNESS FOR A PARTICULAR PURPOSE. See the
|
||||
GNU General Public License for more details.
|
||||
|
||||
You should have received a copy of the GNU General Public License
|
||||
along with this program. If not, see <http://www.gnu.org/licenses/>
|
||||
*/
|
||||
|
||||
package services
|
||||
|
||||
import (
|
||||
"fmt"
|
||||
"sync"
|
||||
|
||||
"github.com/cgrates/cgrates/config"
|
||||
"github.com/cgrates/cgrates/dispatcherh"
|
||||
"github.com/cgrates/cgrates/engine"
|
||||
"github.com/cgrates/cgrates/servmanager"
|
||||
"github.com/cgrates/cgrates/utils"
|
||||
"github.com/cgrates/rpcclient"
|
||||
)
|
||||
|
||||
// NewDispatcherHostsService returns the Dispatcher Service
|
||||
func NewDispatcherHostsService(cfg *config.CGRConfig, server *utils.Server,
|
||||
internalChan chan rpcclient.ClientConnector, connMgr *engine.ConnManager) servmanager.Service {
|
||||
return &DispatcherHostsService{
|
||||
connChan: internalChan,
|
||||
cfg: cfg,
|
||||
server: server,
|
||||
connMgr: connMgr,
|
||||
}
|
||||
}
|
||||
|
||||
// DispatcherHostsService implements Service interface
|
||||
type DispatcherHostsService struct {
|
||||
sync.RWMutex
|
||||
cfg *config.CGRConfig
|
||||
server *utils.Server
|
||||
connMgr *engine.ConnManager
|
||||
|
||||
dspS *dispatcherh.DispatcherHostsService
|
||||
// rpc *v1.DispatcherHSv1
|
||||
connChan chan rpcclient.ClientConnector
|
||||
}
|
||||
|
||||
// Start should handle the sercive start
|
||||
func (dspS *DispatcherHostsService) Start() (err error) {
|
||||
if dspS.IsRunning() {
|
||||
return utils.ErrServiceAlreadyRunning
|
||||
}
|
||||
utils.Logger.Info("Starting CGRateS Dispatcher service.")
|
||||
dspS.Lock()
|
||||
defer dspS.Unlock()
|
||||
|
||||
if dspS.dspS, err = dispatcherh.NewDispatcherHService(dspS.cfg, dspS.connMgr); err != nil {
|
||||
utils.Logger.Crit(fmt.Sprintf("<%s> Could not init, error: %s", utils.DispatcherH, err.Error()))
|
||||
return
|
||||
}
|
||||
|
||||
dspS.connChan <- dspS.dspS
|
||||
|
||||
return
|
||||
}
|
||||
|
||||
// Reload handles the change of config
|
||||
func (dspS *DispatcherHostsService) Reload() (err error) {
|
||||
return // for the momment nothing to reload
|
||||
}
|
||||
|
||||
// Shutdown stops the service
|
||||
func (dspS *DispatcherHostsService) Shutdown() (err error) {
|
||||
dspS.Lock()
|
||||
defer dspS.Unlock()
|
||||
if err = dspS.dspS.Shutdown(); err != nil {
|
||||
return
|
||||
}
|
||||
dspS.dspS = nil
|
||||
// dspS.rpc = nil
|
||||
<-dspS.connChan
|
||||
return
|
||||
}
|
||||
|
||||
// IsRunning returns if the service is running
|
||||
func (dspS *DispatcherHostsService) IsRunning() bool {
|
||||
dspS.RLock()
|
||||
defer dspS.RUnlock()
|
||||
return dspS != nil && dspS.dspS != nil
|
||||
}
|
||||
|
||||
// ServiceName returns the service name
|
||||
func (dspS *DispatcherHostsService) ServiceName() string {
|
||||
return utils.DispatcherH
|
||||
}
|
||||
|
||||
// ShouldRun returns if the service should be running
|
||||
func (dspS *DispatcherHostsService) ShouldRun() bool {
|
||||
return dspS.cfg.DispatcherHCfg().Enabled
|
||||
}
|
||||
@@ -305,6 +305,10 @@ func (srvMngr *ServiceManager) handleReload() {
|
||||
if err = srvMngr.reloadService(utils.SIPAgent); err != nil {
|
||||
return
|
||||
}
|
||||
case <-srvMngr.GetConfig().GetReloadChan(config.DispatcherHJson):
|
||||
if err = srvMngr.reloadService(utils.DispatcherH); err != nil {
|
||||
return
|
||||
}
|
||||
}
|
||||
// handle RPC server
|
||||
}
|
||||
|
||||
@@ -449,6 +449,7 @@ const (
|
||||
MetaBlockerError = "*blocker_error"
|
||||
MetaConfig = "*config"
|
||||
MetaDispatchers = "*dispatchers"
|
||||
MetaDispatcherh = "*dispatcherh"
|
||||
MetaDispatcherHosts = "*dispatcher_hosts"
|
||||
MetaFilters = "*filters"
|
||||
MetaCDRs = "*cdrs"
|
||||
@@ -1537,6 +1538,7 @@ const (
|
||||
|
||||
// DispatcherS APIs
|
||||
const (
|
||||
DispatcherSv1 = "DispatcherSv1"
|
||||
DispatcherSv1Ping = "DispatcherSv1.Ping"
|
||||
DispatcherSv1GetProfileForEvent = "DispatcherSv1.GetProfileForEvent"
|
||||
DispatcherSv1Apier = "DispatcherSv1.Apier"
|
||||
@@ -1545,7 +1547,8 @@ const (
|
||||
|
||||
// DispatcherH APIs
|
||||
const (
|
||||
DispatcherHv1RegisterHosts = "DispatcherHv1.RegisterHosts"
|
||||
DispatcherHv1RegisterHosts = "DispatcherHv1.RegisterHosts"
|
||||
DispatcherHv1UnregisterHosts = "DispatcherHv1.UnregisterHosts"
|
||||
)
|
||||
|
||||
// RateProfile APIs
|
||||
|
||||
Reference in New Issue
Block a user