diff --git a/agents/diamagent.go b/agents/diamagent.go index 283a7ae11..4d4942b83 100644 --- a/agents/diamagent.go +++ b/agents/diamagent.go @@ -476,3 +476,9 @@ func (da *DiameterAgent) V1GetActiveSessionIDs(ignParam string, sessionIDs *[]*sessions.SessionID) error { return utils.ErrNotImplemented } + +// SetSessionSConnection sets the new connection to the session service +// only used on reload +func (da *DiameterAgent) SetSessionSConnection(sS rpcclient.RpcClientConnection) { + da.sS = sS +} diff --git a/cmd/cgr-engine/cgr-engine.go b/cmd/cgr-engine/cgr-engine.go index fae07b297..0f1472059 100644 --- a/cmd/cgr-engine/cgr-engine.go +++ b/cmd/cgr-engine/cgr-engine.go @@ -41,7 +41,6 @@ import ( "github.com/cgrates/cgrates/loaders" "github.com/cgrates/cgrates/services" "github.com/cgrates/cgrates/servmanager" - "github.com/cgrates/cgrates/sessions" "github.com/cgrates/cgrates/utils" "github.com/cgrates/rpcclient" ) @@ -139,61 +138,6 @@ func startCdrc(internalCdrSChan, internalRaterChan chan rpcclient.RpcClientConne } } -func startDiameterAgent(internalSsChan, internalDispatcherSChan chan rpcclient.RpcClientConnection, - filterSChan chan *engine.FilterS, exitChan chan bool) { - var err error - utils.Logger.Info("Starting CGRateS DiameterAgent service") - filterS := <-filterSChan - filterSChan <- filterS - var sS rpcclient.RpcClientConnection - var sSInternal bool - intSsChan := internalSsChan - if cfg.DispatcherSCfg().Enabled { - intSsChan = internalDispatcherSChan - } - if !cfg.DispatcherSCfg().Enabled && cfg.DiameterAgentCfg().SessionSConns[0].Address == utils.MetaInternal { - sSInternal = true - sSIntConn := <-internalSsChan - internalSsChan <- sSIntConn - sS = utils.NewBiRPCInternalClient(sSIntConn.(*sessions.SessionS)) - } else { - sS, err = engine.NewRPCPool(rpcclient.POOL_FIRST, - cfg.TlsCfg().ClientKey, - cfg.TlsCfg().ClientCerificate, cfg.TlsCfg().CaCertificate, - cfg.GeneralCfg().ConnectAttempts, cfg.GeneralCfg().Reconnects, - cfg.GeneralCfg().ConnectTimeout, cfg.GeneralCfg().ReplyTimeout, - cfg.DiameterAgentCfg().SessionSConns, intSsChan, false) - if err != nil { - utils.Logger.Crit(fmt.Sprintf("<%s> Could not connect to %s: %s", - utils.DiameterAgent, utils.SessionS, err.Error())) - exitChan <- true - return - } - } - - da, err := agents.NewDiameterAgent(cfg, filterS, sS) - if err != nil { - utils.Logger.Err(fmt.Sprintf(" error: %s!", err)) - exitChan <- true - return - } - if sSInternal { // bidirectional client backwards connection - sS.(*utils.BiRPCInternalClient).SetClientConn(da) - var rply string - if err := sS.Call(utils.SessionSv1RegisterInternalBiJSONConn, - utils.EmptyString, &rply); err != nil { - utils.Logger.Crit(fmt.Sprintf("<%s> Could not connect to %s: %s", - utils.DiameterAgent, utils.SessionS, err.Error())) - exitChan <- true - return - } - } - if err = da.ListenAndServe(); err != nil { - utils.Logger.Err(fmt.Sprintf(" error: %s!", err)) - } - exitChan <- true -} - func startHTTPAgent(internalSMGChan, internalDispatcherSChan chan rpcclient.RpcClientConnection, server *utils.Server, filterSChan chan *engine.FilterS, dfltTenant string, exitChan chan bool) { filterS := <-filterSChan @@ -764,6 +708,7 @@ func main() { services.NewKamailioAgent(), services.NewAsteriskAgent(), // partial reload services.NewRadiusAgent(), // partial reload + services.NewDiameterAgent(), // partial reload ) internalAttributeSChan := attrS.GetIntenternalChan() internalChargerSChan := chrS.GetIntenternalChan() @@ -816,10 +761,6 @@ func main() { // Start CDRC components if necessary go startCdrcs(internalCdrSChan, internalRaterChan, internalDispatcherSChan, filterSChan, exitChan) - if cfg.DiameterAgentCfg().Enabled { - go startDiameterAgent(internalSMGChan, internalDispatcherSChan, filterSChan, exitChan) - } - if len(cfg.HttpAgentCfg()) != 0 { go startHTTPAgent(internalSMGChan, internalDispatcherSChan, server, filterSChan, cfg.GeneralCfg().DefaultTenant, exitChan) diff --git a/config/config.go b/config/config.go index 8929d4ff8..c66e51421 100755 --- a/config/config.go +++ b/config/config.go @@ -1565,12 +1565,13 @@ func (cfg *CGRConfig) reloadSection(section string) (err error) { } fallthrough case DA_JSN: + cfg.rldChans[DA_JSN] <- struct{}{} if !fall { break } fallthrough case RA_JSN: - cfg.rldChans[RALS_JSN] <- struct{}{} + cfg.rldChans[RA_JSN] <- struct{}{} if !fall { break } diff --git a/services/asteriskagent.go b/services/asteriskagent.go index b545531bc..4093482fd 100644 --- a/services/asteriskagent.go +++ b/services/asteriskagent.go @@ -124,9 +124,6 @@ func (ast *AsteriskAgent) Reload(sp servmanager.ServiceProvider) (err error) { return } } - if err = ast.Shutdown(); err != nil { - return - } ast.Lock() defer ast.Unlock() for _, conn := range ast.smas { diff --git a/services/diameteragent.go b/services/diameteragent.go new file mode 100644 index 000000000..60d7873b0 --- /dev/null +++ b/services/diameteragent.go @@ -0,0 +1,163 @@ +/* +Real-time Online/Offline Charging System (OCS) for Telecom & ISP environments +Copyright (C) ITsysCOM GmbH + +This program is free software: you can redistribute it and/or modify +it under the terms of the GNU General Public License as published by +the Free Software Foundation, either version 3 of the License, or +(at your option) any later version. + +This program is distributed in the hope that it will be useful, +but WITHOUT ANY WARRANTY; without even the implied warranty of +MERCHANTABILITY or FITNESS FOR A PARTICULAR PURPOSE. See the +GNU General Public License for more details. + +You should have received a copy of the GNU General Public License +along with this program. If not, see +*/ + +package services + +import ( + "fmt" + "sync" + + "github.com/cgrates/cgrates/agents" + "github.com/cgrates/cgrates/servmanager" + "github.com/cgrates/cgrates/sessions" + "github.com/cgrates/cgrates/utils" + "github.com/cgrates/rpcclient" +) + +// NewDiameterAgent returns the Diameter Agent +func NewDiameterAgent() servmanager.Service { + return new(DiameterAgent) +} + +// DiameterAgent implements Agent interface +type DiameterAgent struct { + sync.RWMutex + da *agents.DiameterAgent +} + +// Start should handle the sercive start +func (da *DiameterAgent) Start(sp servmanager.ServiceProvider, waitCache bool) (err error) { + if da.IsRunning() { + return fmt.Errorf("service aleady running") + } + + da.Lock() + defer da.Unlock() + var sS rpcclient.RpcClientConnection + var sSInternal bool + utils.Logger.Info("Starting Diameter agent") + if !sp.GetConfig().DispatcherSCfg().Enabled && sp.GetConfig().DiameterAgentCfg().SessionSConns[0].Address == utils.MetaInternal { + sSInternal = true + srvSessionS, has := sp.GetService(utils.SessionS) + if !has { + utils.Logger.Err(fmt.Sprintf("<%s> Failed to find needed subsystem <%s>", + utils.DiameterAgent, utils.SessionS)) + return utils.ErrNotFound + } + sSIntConn := <-srvSessionS.GetIntenternalChan() + srvSessionS.GetIntenternalChan() <- sSIntConn + sS = utils.NewBiRPCInternalClient(sSIntConn.(*sessions.SessionS)) + } else { + if sS, err = sp.NewConnection(utils.SessionS, sp.GetConfig().DiameterAgentCfg().SessionSConns); err != nil { + utils.Logger.Crit(fmt.Sprintf("<%s> Could not connect to %s: %s", + utils.DiameterAgent, utils.SessionS, err.Error())) + return + } + } + utils.Logger.Info("Starting CGRateS DiameterAgent service") + + da.da, err = agents.NewDiameterAgent(sp.GetConfig(), sp.GetFilterS(), sS) + if err != nil { + utils.Logger.Err(fmt.Sprintf(" error: %s!", err)) + return + } + if sSInternal { // bidirectional client backwards connection + sS.(*utils.BiRPCInternalClient).SetClientConn(da.da) + var rply string + if err = sS.Call(utils.SessionSv1RegisterInternalBiJSONConn, + utils.EmptyString, &rply); err != nil { + utils.Logger.Crit(fmt.Sprintf("<%s> Could not connect to %s: %s", + utils.DiameterAgent, utils.SessionS, err.Error())) + return + } + } + + go func() { + if err = da.da.ListenAndServe(); err != nil { + utils.Logger.Err(fmt.Sprintf("<%s> error: %s!", + utils.DiameterAgent, err)) + } + sp.GetExitChan() <- true + }() + return +} + +// GetIntenternalChan returns the internal connection chanel +func (da *DiameterAgent) GetIntenternalChan() (conn chan rpcclient.RpcClientConnection) { + return nil +} + +// Reload handles the change of config +func (da *DiameterAgent) Reload(sp servmanager.ServiceProvider) (err error) { + var sS rpcclient.RpcClientConnection + var sSInternal bool + if !sp.GetConfig().DispatcherSCfg().Enabled && sp.GetConfig().DiameterAgentCfg().SessionSConns[0].Address == utils.MetaInternal { + sSInternal = true + srvSessionS, has := sp.GetService(utils.SessionS) + if !has { + utils.Logger.Err(fmt.Sprintf("<%s> Failed to find needed subsystem <%s>", + utils.DiameterAgent, utils.SessionS)) + return utils.ErrNotFound + } + sSIntConn := <-srvSessionS.GetIntenternalChan() + srvSessionS.GetIntenternalChan() <- sSIntConn + sS = utils.NewBiRPCInternalClient(sSIntConn.(*sessions.SessionS)) + } else { + if sS, err = sp.NewConnection(utils.SessionS, sp.GetConfig().DiameterAgentCfg().SessionSConns); err != nil { + utils.Logger.Crit(fmt.Sprintf("<%s> Could not connect to %s: %s", + utils.DiameterAgent, utils.SessionS, err.Error())) + return + } + } + da.Lock() + defer da.Unlock() + // da.da.SetSessionSConnection(sS) + if sSInternal { // bidirectional client backwards connection + sS.(*utils.BiRPCInternalClient).SetClientConn(da.da) + var rply string + if err = sS.Call(utils.SessionSv1RegisterInternalBiJSONConn, + utils.EmptyString, &rply); err != nil { + utils.Logger.Crit(fmt.Sprintf("<%s> Could not connect to %s: %s", + utils.DiameterAgent, utils.SessionS, err.Error())) + return + } + } + return // partial reload +} + +// Shutdown stops the service +func (da *DiameterAgent) Shutdown() (err error) { + return // no shutdown for the momment +} + +// GetRPCInterface returns the interface to register for server +func (da *DiameterAgent) GetRPCInterface() interface{} { + return da.da +} + +// IsRunning returns if the service is running +func (da *DiameterAgent) IsRunning() bool { + da.RLock() + defer da.RUnlock() + return da != nil && da.da != nil +} + +// ServiceName returns the service name +func (da *DiameterAgent) ServiceName() string { + return utils.DiameterAgent +} diff --git a/services/radiusagent.go b/services/radiusagent.go index f73f4723d..904c040a1 100644 --- a/services/radiusagent.go +++ b/services/radiusagent.go @@ -83,8 +83,8 @@ func (rad *RadiusAgent) Reload(sp servmanager.ServiceProvider) (err error) { return } rad.Lock() - defer rad.Unlock() rad.rad.SetSessionSConnection(smgConn) + rad.Unlock() return // partial reload } diff --git a/servmanager/servmanager.go b/servmanager/servmanager.go index ac548e243..292ce5020 100644 --- a/servmanager/servmanager.go +++ b/servmanager/servmanager.go @@ -295,6 +295,9 @@ func (srvMngr *ServiceManager) StartServices() (err error) { if srvMngr.GetConfig().RadiusAgentCfg().Enabled { go srvMngr.startService(utils.RadiusAgent) } + if srvMngr.GetConfig().DiameterAgentCfg().Enabled { + go srvMngr.startService(utils.DiameterAgent) + } // startServer() return } @@ -391,6 +394,10 @@ func (srvMngr *ServiceManager) handleReload() { if err = srvMngr.reloadService(utils.RadiusAgent, srvMngr.GetConfig().RadiusAgentCfg().Enabled); err != nil { return } + case <-srvMngr.GetConfig().GetReloadChan(config.DA_JSN): + if err = srvMngr.reloadService(utils.DiameterAgent, srvMngr.GetConfig().DiameterAgentCfg().Enabled); err != nil { + return + } } // handle RPC server }