diff --git a/agents/fsagent.go b/agents/fsagent.go index 0c9ecffbd..c05fc8049 100644 --- a/agents/fsagent.go +++ b/agents/fsagent.go @@ -440,7 +440,7 @@ func (fsa *FSsessions) V1GetActiveSessionIDs(ignParam string, return } -// SetSessionSConnection sets the new connection to the threshold service +// SetSessionSConnection sets the new connection to the session service // only used on reload func (sm *FSsessions) SetSessionSConnection(sS rpcclient.RpcClientConnection) { sm.sS = sS diff --git a/agents/kamagent.go b/agents/kamagent.go index 206c9772f..79ad4f4da 100644 --- a/agents/kamagent.go +++ b/agents/kamagent.go @@ -89,8 +89,15 @@ func (self *KamailioAgent) Connect() error { return err } -func (self *KamailioAgent) Shutdown() error { - return nil +func (self *KamailioAgent) Shutdown() (err error) { + for conIndx, conn := range self.conns { + if err = conn.Disconnect(); err != nil { + utils.Logger.Err(fmt.Sprintf("<%s> can't disconnect connection at index %v because: %s", + utils.KamailioAgent, conIndx, err)) + continue + } + } + return } // rpcclient.RpcClientConnection interface @@ -102,7 +109,7 @@ func (ka *KamailioAgent) Call(serviceMethod string, args interface{}, reply inte func (ka *KamailioAgent) onCgrAuth(evData []byte, connIdx int) { if connIdx >= len(ka.conns) { // protection against index out of range panic err := fmt.Errorf("Index out of range[0,%v): %v ", len(ka.conns), connIdx) - utils.Logger.Err(fmt.Sprintf("<%s> %s", utils.FreeSWITCHAgent, err.Error())) + utils.Logger.Err(fmt.Sprintf("<%s> %s", utils.KamailioAgent, err.Error())) return } kev, err := NewKamEvent(evData, ka.cfg.EvapiConns[connIdx].Alias, ka.conns[connIdx].RemoteAddr().String()) @@ -145,7 +152,7 @@ func (ka *KamailioAgent) onCgrAuth(evData []byte, connIdx int) { func (ka *KamailioAgent) onCallStart(evData []byte, connIdx int) { if connIdx >= len(ka.conns) { // protection against index out of range panic err := fmt.Errorf("Index out of range[0,%v): %v ", len(ka.conns), connIdx) - utils.Logger.Err(fmt.Sprintf("<%s> %s", utils.FreeSWITCHAgent, err.Error())) + utils.Logger.Err(fmt.Sprintf("<%s> %s", utils.KamailioAgent, err.Error())) return } kev, err := NewKamEvent(evData, ka.cfg.EvapiConns[connIdx].Alias, ka.conns[connIdx].RemoteAddr().String()) @@ -187,7 +194,7 @@ func (ka *KamailioAgent) onCallStart(evData []byte, connIdx int) { func (ka *KamailioAgent) onCallEnd(evData []byte, connIdx int) { if connIdx >= len(ka.conns) { // protection against index out of range panic err := fmt.Errorf("Index out of range[0,%v): %v ", len(ka.conns), connIdx) - utils.Logger.Err(fmt.Sprintf("<%s> %s", utils.FreeSWITCHAgent, err.Error())) + utils.Logger.Err(fmt.Sprintf("<%s> %s", utils.KamailioAgent, err.Error())) return } kev, err := NewKamEvent(evData, ka.cfg.EvapiConns[connIdx].Alias, ka.conns[connIdx].RemoteAddr().String()) @@ -254,7 +261,7 @@ func (ka *KamailioAgent) onDlgList(evData []byte, connIdx int) { func (ka *KamailioAgent) onCgrProcessMessage(evData []byte, connIdx int) { if connIdx >= len(ka.conns) { // protection against index out of range panic err := fmt.Errorf("Index out of range[0,%v): %v ", len(ka.conns), connIdx) - utils.Logger.Err(fmt.Sprintf("<%s> %s", utils.FreeSWITCHAgent, err.Error())) + utils.Logger.Err(fmt.Sprintf("<%s> %s", utils.KamailioAgent, err.Error())) return } kev, err := NewKamEvent(evData, ka.cfg.EvapiConns[connIdx].Alias, ka.conns[connIdx].RemoteAddr().String()) @@ -307,7 +314,7 @@ func (ka *KamailioAgent) onCgrProcessMessage(evData []byte, connIdx int) { func (ka *KamailioAgent) onCgrProcessCDR(evData []byte, connIdx int) { if connIdx >= len(ka.conns) { // protection against index out of range panic err := fmt.Errorf("Index out of range[0,%v): %v ", len(ka.conns), connIdx) - utils.Logger.Err(fmt.Sprintf("<%s> %s", utils.FreeSWITCHAgent, err.Error())) + utils.Logger.Err(fmt.Sprintf("<%s> %s", utils.KamailioAgent, err.Error())) return } kev, err := NewKamEvent(evData, ka.cfg.EvapiConns[connIdx].Alias, ka.conns[connIdx].RemoteAddr().String()) @@ -375,7 +382,7 @@ func (ka *KamailioAgent) V1DisconnectSession(args utils.AttrDisconnectSession, r } if int(connIdx) >= len(ka.conns) { // protection against index out of range panic err = fmt.Errorf("Index out of range[0,%v): %v ", len(ka.conns), connIdx) - utils.Logger.Err(fmt.Sprintf("<%s> %s", utils.FreeSWITCHAgent, err.Error())) + utils.Logger.Err(fmt.Sprintf("<%s> %s", utils.KamailioAgent, err.Error())) return } if err = ka.disconnectSession(int(connIdx), @@ -403,3 +410,15 @@ func (ka *KamailioAgent) V1GetActiveSessionIDs(ignParam string, sessionIDs *[]*s } return } + +// SetSessionSConnection sets the new connection to the session service +// only used on reload +func (ka *KamailioAgent) SetSessionSConnection(sS rpcclient.RpcClientConnection) { + ka.sessionS = sS +} + +// Reload recreates the connection buffers +// only used on reload +func (ka *KamailioAgent) Reload() { + ka.conns = make([]*kamevapi.KamEvapi, len(ka.cfg.EvapiConns)) +} diff --git a/cmd/cgr-engine/cgr-engine.go b/cmd/cgr-engine/cgr-engine.go index 738baeba8..8e10afe64 100644 --- a/cmd/cgr-engine/cgr-engine.go +++ b/cmd/cgr-engine/cgr-engine.go @@ -287,53 +287,6 @@ func startRadiusAgent(internalSMGChan, internalDispatcherSChan chan rpcclient.Rp exitChan <- true } -func startKamAgent(internalSMGChan, internalDispatcherSChan chan rpcclient.RpcClientConnection, exitChan chan bool) { - var err error - var sS rpcclient.RpcClientConnection - var sSInternal bool - utils.Logger.Info("Starting Kamailio agent") - intSMGChan := internalSMGChan - if cfg.DispatcherSCfg().Enabled { - intSMGChan = internalDispatcherSChan - } - if !cfg.DispatcherSCfg().Enabled && cfg.KamAgentCfg().SessionSConns[0].Address == utils.MetaInternal { - sSInternal = true - sSIntConn := <-internalSMGChan - internalSMGChan <- sSIntConn - sS = utils.NewBiRPCInternalClient(sSIntConn.(*sessions.SessionS)) - } else { - sS, err = engine.NewRPCPool(rpcclient.POOL_FIRST, - cfg.TlsCfg().ClientKey, - cfg.TlsCfg().ClientCerificate, cfg.TlsCfg().CaCertificate, - cfg.GeneralCfg().ConnectAttempts, cfg.GeneralCfg().Reconnects, - cfg.GeneralCfg().ConnectTimeout, cfg.GeneralCfg().ReplyTimeout, - cfg.KamAgentCfg().SessionSConns, intSMGChan, false) - if err != nil { - utils.Logger.Crit(fmt.Sprintf("<%s> Could not connect to %s: %s", - utils.KamailioAgent, utils.SessionS, err.Error())) - exitChan <- true - return - } - } - ka := agents.NewKamailioAgent(cfg.KamAgentCfg(), sS, - utils.FirstNonEmpty(cfg.KamAgentCfg().Timezone, cfg.GeneralCfg().DefaultTimezone)) - if sSInternal { // bidirectional client backwards connection - sS.(*utils.BiRPCInternalClient).SetClientConn(ka) - var rply string - if err := sS.Call(utils.SessionSv1RegisterInternalBiJSONConn, - utils.EmptyString, &rply); err != nil { - utils.Logger.Crit(fmt.Sprintf("<%s> Could not connect to %s: %s", - utils.KamailioAgent, utils.SessionS, err.Error())) - exitChan <- true - return - } - } - if err = ka.Connect(); err != nil { - utils.Logger.Err(fmt.Sprintf("<%s> error: %s", utils.KamailioAgent, err)) - } - exitChan <- true -} - func startHTTPAgent(internalSMGChan, internalDispatcherSChan chan rpcclient.RpcClientConnection, server *utils.Server, filterSChan chan *engine.FilterS, dfltTenant string, exitChan chan bool) { filterS := <-filterSChan @@ -900,7 +853,8 @@ func main() { srvManager.AddService(chS, attrS, chrS, tS, stS, reS, supS, schS, cdrS, rals, smg, services.NewEventReaderService(), services.NewDNSAgent(), - services.NewFreeswitchAgent()) + services.NewFreeswitchAgent(), + services.NewKamailioAgent()) internalAttributeSChan := attrS.GetIntenternalChan() internalChargerSChan := chrS.GetIntenternalChan() internalThresholdSChan := tS.GetIntenternalChan() @@ -952,11 +906,6 @@ func main() { // Start CDRC components if necessary go startCdrcs(internalCdrSChan, internalRaterChan, internalDispatcherSChan, filterSChan, exitChan) - // Start SM-Kamailio - if cfg.KamAgentCfg().Enabled { - go startKamAgent(internalSMGChan, internalDispatcherSChan, exitChan) - } - if cfg.AsteriskAgentCfg().Enabled { go startAsteriskAgent(internalSMGChan, internalDispatcherSChan, exitChan) } diff --git a/config/config.go b/config/config.go index c75f000ba..b6cd88eff 100755 --- a/config/config.go +++ b/config/config.go @@ -1196,8 +1196,11 @@ func (cfg *CGRConfig) FsAgentCfg() *FsAgentCfg { return cfg.fsAgentCfg } -func (self *CGRConfig) KamAgentCfg() *KamAgentCfg { - return self.kamAgentCfg +// KamAgentCfg returns the config for KamAgent +func (cfg *CGRConfig) KamAgentCfg() *KamAgentCfg { + cfg.lks[KamailioAgentJSN].Lock() + defer cfg.lks[KamailioAgentJSN].Unlock() + return cfg.kamAgentCfg } // ToDo: fix locking here @@ -1553,6 +1556,7 @@ func (cfg *CGRConfig) reloadSection(section string) (err error) { } fallthrough case KamailioAgentJSN: + cfg.rldChans[KamailioAgentJSN] <- struct{}{} if !fall { break } diff --git a/services/freeswitchagent.go b/services/freeswitchagent.go index 099d219de..86a401e39 100644 --- a/services/freeswitchagent.go +++ b/services/freeswitchagent.go @@ -56,7 +56,7 @@ func (fS *FreeswitchAgent) Start(sp servmanager.ServiceProvider, waitCache bool) srvSessionS, has := sp.GetService(utils.SessionS) if !has { utils.Logger.Err(fmt.Sprintf("<%s> Failed to find needed subsystem <%s>", - utils.DNSAgent, utils.SessionS)) + utils.FreeSWITCHAgent, utils.SessionS)) return utils.ErrNotFound } sSIntConn := <-srvSessionS.GetIntenternalChan() @@ -105,7 +105,7 @@ func (fS *FreeswitchAgent) Reload(sp servmanager.ServiceProvider) (err error) { srvSessionS, has := sp.GetService(utils.SessionS) if !has { utils.Logger.Err(fmt.Sprintf("<%s> Failed to find needed subsystem <%s>", - utils.DNSAgent, utils.SessionS)) + utils.FreeSWITCHAgent, utils.SessionS)) return utils.ErrNotFound } sSIntConn := <-srvSessionS.GetIntenternalChan() diff --git a/services/kamailioagent.go b/services/kamailioagent.go new file mode 100644 index 000000000..cb966d0a7 --- /dev/null +++ b/services/kamailioagent.go @@ -0,0 +1,179 @@ +/* +Real-time Online/Offline Charging System (OCS) for Telecom & ISP environments +Copyright (C) ITsysCOM GmbH + +This program is free software: you can redistribute it and/or modify +it under the terms of the GNU General Public License as published by +the Free Software Foundation, either version 3 of the License, or +(at your option) any later version. + +This program is distributed in the hope that it will be useful, +but WITHOUT ANY WARRANTY; without even the implied warranty of +MERCHANTABILITY or FITNESS FOR A PARTICULAR PURPOSE. See the +GNU General Public License for more details. + +You should have received a copy of the GNU General Public License +along with this program. If not, see +*/ + +package services + +import ( + "fmt" + "strings" + "sync" + + "github.com/cgrates/cgrates/agents" + "github.com/cgrates/cgrates/servmanager" + "github.com/cgrates/cgrates/sessions" + "github.com/cgrates/cgrates/utils" + "github.com/cgrates/rpcclient" +) + +// NewKamailioAgent returns the Kamailio Agent +func NewKamailioAgent() servmanager.Service { + return new(KamailioAgent) +} + +// KamailioAgent implements Agent interface +type KamailioAgent struct { + sync.RWMutex + kam *agents.KamailioAgent +} + +// Start should handle the sercive start +func (kam *KamailioAgent) Start(sp servmanager.ServiceProvider, waitCache bool) (err error) { + if kam.IsRunning() { + return fmt.Errorf("service aleady running") + } + + kam.Lock() + defer kam.Unlock() + var sS rpcclient.RpcClientConnection + var sSInternal bool + utils.Logger.Info("Starting Kamailio agent") + if !sp.GetConfig().DispatcherSCfg().Enabled && sp.GetConfig().KamAgentCfg().SessionSConns[0].Address == utils.MetaInternal { + sSInternal = true + srvSessionS, has := sp.GetService(utils.SessionS) + if !has { + utils.Logger.Err(fmt.Sprintf("<%s> Failed to find needed subsystem <%s>", + utils.KamailioAgent, utils.SessionS)) + return utils.ErrNotFound + } + sSIntConn := <-srvSessionS.GetIntenternalChan() + srvSessionS.GetIntenternalChan() <- sSIntConn + sS = utils.NewBiRPCInternalClient(sSIntConn.(*sessions.SessionS)) + } else { + if sS, err = sp.NewConnection(utils.SessionS, sp.GetConfig().KamAgentCfg().SessionSConns); err != nil { + utils.Logger.Crit(fmt.Sprintf("<%s> Could not connect to %s: %s", + utils.KamailioAgent, utils.SessionS, err.Error())) + return + } + } + kam.kam = agents.NewKamailioAgent(sp.GetConfig().KamAgentCfg(), sS, + utils.FirstNonEmpty(sp.GetConfig().KamAgentCfg().Timezone, sp.GetConfig().GeneralCfg().DefaultTimezone)) + if sSInternal { // bidirectional client backwards connection + sS.(*utils.BiRPCInternalClient).SetClientConn(kam.kam) + var rply string + if err = sS.Call(utils.SessionSv1RegisterInternalBiJSONConn, + utils.EmptyString, &rply); err != nil { + utils.Logger.Crit(fmt.Sprintf("<%s> Could not connect to %s: %s", + utils.KamailioAgent, utils.SessionS, err.Error())) + return + } + } + go func() { + if err = kam.kam.Connect(); err != nil { + if strings.Contains(err.Error(), "use of closed network connection") { // if closed by us do not log + return + } + utils.Logger.Err(fmt.Sprintf("<%s> error: %s", utils.KamailioAgent, err)) + sp.GetExitChan() <- true + } + }() + return +} + +// GetIntenternalChan returns the internal connection chanel +func (kam *KamailioAgent) GetIntenternalChan() (conn chan rpcclient.RpcClientConnection) { + return nil +} + +// Reload handles the change of config +func (kam *KamailioAgent) Reload(sp servmanager.ServiceProvider) (err error) { + var sS rpcclient.RpcClientConnection + var sSInternal bool + if !sp.GetConfig().DispatcherSCfg().Enabled && sp.GetConfig().KamAgentCfg().SessionSConns[0].Address == utils.MetaInternal { + sSInternal = true + srvSessionS, has := sp.GetService(utils.SessionS) + if !has { + utils.Logger.Err(fmt.Sprintf("<%s> Failed to find needed subsystem <%s>", + utils.KamailioAgent, utils.SessionS)) + return utils.ErrNotFound + } + sSIntConn := <-srvSessionS.GetIntenternalChan() + srvSessionS.GetIntenternalChan() <- sSIntConn + sS = utils.NewBiRPCInternalClient(sSIntConn.(*sessions.SessionS)) + } else { + if sS, err = sp.NewConnection(utils.SessionS, sp.GetConfig().FsAgentCfg().SessionSConns); err != nil { + utils.Logger.Crit(fmt.Sprintf("<%s> Could not connect to %s: %s", + utils.KamailioAgent, utils.SessionS, err.Error())) + return + } + } + if err = kam.Shutdown(); err != nil { + return + } + kam.Lock() + defer kam.Unlock() + kam.kam.SetSessionSConnection(sS) + kam.kam.Reload() + if sSInternal { // bidirectional client backwards connection + sS.(*utils.BiRPCInternalClient).SetClientConn(kam.kam) + var rply string + if err = sS.Call(utils.SessionSv1RegisterInternalBiJSONConn, + utils.EmptyString, &rply); err != nil { + utils.Logger.Crit(fmt.Sprintf("<%s> Could not connect to %s: %s", + utils.KamailioAgent, utils.SessionS, err.Error())) + return + } + } + go func() { + if err = kam.kam.Connect(); err != nil { + if strings.Contains(err.Error(), "use of closed network connection") { // if closed by us do not log + return + } + utils.Logger.Err(fmt.Sprintf("<%s> error: %s", utils.KamailioAgent, err)) + sp.GetExitChan() <- true + } + }() + return +} + +// Shutdown stops the service +func (kam *KamailioAgent) Shutdown() (err error) { + kam.Lock() + defer kam.Unlock() + if err = kam.kam.Shutdown(); err != nil { + return + } + kam.kam = nil + return +} + +// GetRPCInterface returns the interface to register for server +func (kam *KamailioAgent) GetRPCInterface() interface{} { + return kam.kam +} + +// IsRunning returns if the service is running +func (kam *KamailioAgent) IsRunning() bool { + kam.RLock() + defer kam.RUnlock() + return kam != nil && kam.kam != nil +} + +// ServiceName returns the service name +func (kam *KamailioAgent) ServiceName() string { + return utils.KamailioAgent +} diff --git a/servmanager/servmanager.go b/servmanager/servmanager.go index c0232014a..2d600acd6 100644 --- a/servmanager/servmanager.go +++ b/servmanager/servmanager.go @@ -286,6 +286,9 @@ func (srvMngr *ServiceManager) StartServices() (err error) { if srvMngr.GetConfig().FsAgentCfg().Enabled { go srvMngr.startService(utils.FreeSWITCHAgent) } + if srvMngr.GetConfig().KamAgentCfg().Enabled { + go srvMngr.startService(utils.KamailioAgent) + } // startServer() return } @@ -370,6 +373,10 @@ func (srvMngr *ServiceManager) handleReload() { if err = srvMngr.reloadService(utils.FreeSWITCHAgent, srvMngr.GetConfig().FsAgentCfg().Enabled); err != nil { return } + case <-srvMngr.GetConfig().GetReloadChan(config.KamailioAgentJSN): + if err = srvMngr.reloadService(utils.KamailioAgent, srvMngr.GetConfig().KamAgentCfg().Enabled); err != nil { + return + } } // handle RPC server }