Updated HTTP Agent

This commit is contained in:
Trial97
2019-10-04 13:27:19 +03:00
committed by Dan Christian Bogos
parent 795ba93c95
commit 878aa0825b
3 changed files with 42 additions and 27 deletions

View File

@@ -715,10 +715,8 @@ func main() {
services.NewAsteriskAgent(cfg, smg.GetIntenternalChan(), internalDispatcherSChan, exitChan), // partial reload
services.NewRadiusAgent(cfg, filterSChan, smg.GetIntenternalChan(), internalDispatcherSChan, exitChan), // partial reload
services.NewDiameterAgent(cfg, filterSChan, smg.GetIntenternalChan(), internalDispatcherSChan, exitChan), // partial reload
services.NewHTTPAgent(cfg, filterSChan, smg.GetIntenternalChan(), internalDispatcherSChan, server), // no reload
)
/*
services.NewHTTPAgent(), // no reload
*/
srvManager.StartServices()

View File

@@ -23,40 +23,59 @@ import (
"sync"
"github.com/cgrates/cgrates/agents"
"github.com/cgrates/cgrates/config"
"github.com/cgrates/cgrates/engine"
"github.com/cgrates/cgrates/servmanager"
"github.com/cgrates/cgrates/utils"
"github.com/cgrates/rpcclient"
)
// NewHTTPAgent returns the HTTP Agent
func NewHTTPAgent() servmanager.Service {
return new(HTTPAgent)
func NewHTTPAgent(cfg *config.CGRConfig, filterSChan chan *engine.FilterS,
sSChan, dispatcherChan chan rpcclient.RpcClientConnection,
server *utils.Server) servmanager.Service {
return &HTTPAgent{
cfg: cfg,
filterSChan: filterSChan,
sSChan: sSChan,
dispatcherChan: dispatcherChan,
server: server,
}
}
// HTTPAgent implements Agent interface
type HTTPAgent struct {
sync.RWMutex
cfg *config.CGRConfig
filterSChan chan *engine.FilterS
sSChan chan rpcclient.RpcClientConnection
dispatcherChan chan rpcclient.RpcClientConnection
server *utils.Server
ha *agents.HTTPAgent
}
// Start should handle the sercive start
func (ha *HTTPAgent) Start(sp servmanager.ServiceProvider, waitCache bool) (err error) {
func (ha *HTTPAgent) Start() (err error) {
if ha.IsRunning() {
return fmt.Errorf("service aleady running")
}
filterS := <-ha.filterSChan
ha.filterSChan <- filterS
ha.Lock()
defer ha.Unlock()
utils.Logger.Info("Starting HTTP agent")
for _, agntCfg := range sp.GetConfig().HttpAgentCfg() {
for _, agntCfg := range ha.cfg.HttpAgentCfg() {
var sS rpcclient.RpcClientConnection
if sS, err = sp.NewConnection(utils.SessionS, agntCfg.SessionSConns); err != nil {
if sS, err = NewConnection(ha.cfg, ha.sSChan, ha.dispatcherChan, agntCfg.SessionSConns); err != nil {
utils.Logger.Crit(fmt.Sprintf("<%s> could not connect to %s, error: %s",
utils.HTTPAgent, utils.SessionS, err.Error()))
return
}
sp.GetServer().RegisterHttpHandler(agntCfg.Url,
agents.NewHTTPAgent(sS, sp.GetFilterS(), sp.GetConfig().GeneralCfg().DefaultTenant, agntCfg.RequestPayload,
ha.server.RegisterHttpHandler(agntCfg.Url,
agents.NewHTTPAgent(sS, filterS, ha.cfg.GeneralCfg().DefaultTenant, agntCfg.RequestPayload,
agntCfg.ReplyPayload, agntCfg.RequestProcessors))
}
return
@@ -68,7 +87,7 @@ func (ha *HTTPAgent) GetIntenternalChan() (conn chan rpcclient.RpcClientConnecti
}
// Reload handles the change of config
func (ha *HTTPAgent) Reload(sp servmanager.ServiceProvider) (err error) {
func (ha *HTTPAgent) Reload() (err error) {
return // no reload
}
@@ -77,11 +96,6 @@ func (ha *HTTPAgent) Shutdown() (err error) {
return // no shutdown for the momment
}
// GetRPCInterface returns the interface to register for server
func (ha *HTTPAgent) GetRPCInterface() interface{} {
return ha.ha
}
// IsRunning returns if the service is running
func (ha *HTTPAgent) IsRunning() bool {
ha.RLock()
@@ -93,3 +107,8 @@ func (ha *HTTPAgent) IsRunning() bool {
func (ha *HTTPAgent) ServiceName() string {
return utils.HTTPAgent
}
// ShouldRun returns if the service should be running
func (ha *HTTPAgent) ShouldRun() bool {
return len(ha.cfg.HttpAgentCfg()) != 0
}

View File

@@ -291,11 +291,10 @@ func (srvMngr *ServiceManager) StartServices() (err error) {
}
if srvMngr.GetConfig().DiameterAgentCfg().Enabled {
go srvMngr.startService(utils.DiameterAgent)
} /*
if len(srvMngr.GetConfig().HttpAgentCfg()) != 0 {
go srvMngr.startService(utils.HTTPAgent)
}
*/
}
if len(srvMngr.GetConfig().HttpAgentCfg()) != 0 {
go srvMngr.startService(utils.HTTPAgent)
}
// startServer()
return
}
@@ -399,12 +398,11 @@ func (srvMngr *ServiceManager) handleReload() {
case <-srvMngr.GetConfig().GetReloadChan(config.DA_JSN):
if err = srvMngr.reloadService(utils.DiameterAgent); err != nil {
return
} /*
case <-srvMngr.GetConfig().GetReloadChan(config.HttpAgentJson):
if err = srvMngr.reloadService(utils.HTTPAgent); err != nil {
return
}
*/
}
case <-srvMngr.GetConfig().GetReloadChan(config.HttpAgentJson):
if err = srvMngr.reloadService(utils.HTTPAgent); err != nil {
return
}
}
// handle RPC server
}