mirror of
https://github.com/cgrates/cgrates.git
synced 2026-02-15 05:09:54 +05:00
Added KamailioAgent as service in ServiceManager
This commit is contained in:
committed by
Dan Christian Bogos
parent
e1b0b65730
commit
1f106a62ce
@@ -440,7 +440,7 @@ func (fsa *FSsessions) V1GetActiveSessionIDs(ignParam string,
|
||||
return
|
||||
}
|
||||
|
||||
// SetSessionSConnection sets the new connection to the threshold service
|
||||
// SetSessionSConnection sets the new connection to the session service
|
||||
// only used on reload
|
||||
func (sm *FSsessions) SetSessionSConnection(sS rpcclient.RpcClientConnection) {
|
||||
sm.sS = sS
|
||||
|
||||
@@ -89,8 +89,15 @@ func (self *KamailioAgent) Connect() error {
|
||||
return err
|
||||
}
|
||||
|
||||
func (self *KamailioAgent) Shutdown() error {
|
||||
return nil
|
||||
func (self *KamailioAgent) Shutdown() (err error) {
|
||||
for conIndx, conn := range self.conns {
|
||||
if err = conn.Disconnect(); err != nil {
|
||||
utils.Logger.Err(fmt.Sprintf("<%s> can't disconnect connection at index %v because: %s",
|
||||
utils.KamailioAgent, conIndx, err))
|
||||
continue
|
||||
}
|
||||
}
|
||||
return
|
||||
}
|
||||
|
||||
// rpcclient.RpcClientConnection interface
|
||||
@@ -102,7 +109,7 @@ func (ka *KamailioAgent) Call(serviceMethod string, args interface{}, reply inte
|
||||
func (ka *KamailioAgent) onCgrAuth(evData []byte, connIdx int) {
|
||||
if connIdx >= len(ka.conns) { // protection against index out of range panic
|
||||
err := fmt.Errorf("Index out of range[0,%v): %v ", len(ka.conns), connIdx)
|
||||
utils.Logger.Err(fmt.Sprintf("<%s> %s", utils.FreeSWITCHAgent, err.Error()))
|
||||
utils.Logger.Err(fmt.Sprintf("<%s> %s", utils.KamailioAgent, err.Error()))
|
||||
return
|
||||
}
|
||||
kev, err := NewKamEvent(evData, ka.cfg.EvapiConns[connIdx].Alias, ka.conns[connIdx].RemoteAddr().String())
|
||||
@@ -145,7 +152,7 @@ func (ka *KamailioAgent) onCgrAuth(evData []byte, connIdx int) {
|
||||
func (ka *KamailioAgent) onCallStart(evData []byte, connIdx int) {
|
||||
if connIdx >= len(ka.conns) { // protection against index out of range panic
|
||||
err := fmt.Errorf("Index out of range[0,%v): %v ", len(ka.conns), connIdx)
|
||||
utils.Logger.Err(fmt.Sprintf("<%s> %s", utils.FreeSWITCHAgent, err.Error()))
|
||||
utils.Logger.Err(fmt.Sprintf("<%s> %s", utils.KamailioAgent, err.Error()))
|
||||
return
|
||||
}
|
||||
kev, err := NewKamEvent(evData, ka.cfg.EvapiConns[connIdx].Alias, ka.conns[connIdx].RemoteAddr().String())
|
||||
@@ -187,7 +194,7 @@ func (ka *KamailioAgent) onCallStart(evData []byte, connIdx int) {
|
||||
func (ka *KamailioAgent) onCallEnd(evData []byte, connIdx int) {
|
||||
if connIdx >= len(ka.conns) { // protection against index out of range panic
|
||||
err := fmt.Errorf("Index out of range[0,%v): %v ", len(ka.conns), connIdx)
|
||||
utils.Logger.Err(fmt.Sprintf("<%s> %s", utils.FreeSWITCHAgent, err.Error()))
|
||||
utils.Logger.Err(fmt.Sprintf("<%s> %s", utils.KamailioAgent, err.Error()))
|
||||
return
|
||||
}
|
||||
kev, err := NewKamEvent(evData, ka.cfg.EvapiConns[connIdx].Alias, ka.conns[connIdx].RemoteAddr().String())
|
||||
@@ -254,7 +261,7 @@ func (ka *KamailioAgent) onDlgList(evData []byte, connIdx int) {
|
||||
func (ka *KamailioAgent) onCgrProcessMessage(evData []byte, connIdx int) {
|
||||
if connIdx >= len(ka.conns) { // protection against index out of range panic
|
||||
err := fmt.Errorf("Index out of range[0,%v): %v ", len(ka.conns), connIdx)
|
||||
utils.Logger.Err(fmt.Sprintf("<%s> %s", utils.FreeSWITCHAgent, err.Error()))
|
||||
utils.Logger.Err(fmt.Sprintf("<%s> %s", utils.KamailioAgent, err.Error()))
|
||||
return
|
||||
}
|
||||
kev, err := NewKamEvent(evData, ka.cfg.EvapiConns[connIdx].Alias, ka.conns[connIdx].RemoteAddr().String())
|
||||
@@ -307,7 +314,7 @@ func (ka *KamailioAgent) onCgrProcessMessage(evData []byte, connIdx int) {
|
||||
func (ka *KamailioAgent) onCgrProcessCDR(evData []byte, connIdx int) {
|
||||
if connIdx >= len(ka.conns) { // protection against index out of range panic
|
||||
err := fmt.Errorf("Index out of range[0,%v): %v ", len(ka.conns), connIdx)
|
||||
utils.Logger.Err(fmt.Sprintf("<%s> %s", utils.FreeSWITCHAgent, err.Error()))
|
||||
utils.Logger.Err(fmt.Sprintf("<%s> %s", utils.KamailioAgent, err.Error()))
|
||||
return
|
||||
}
|
||||
kev, err := NewKamEvent(evData, ka.cfg.EvapiConns[connIdx].Alias, ka.conns[connIdx].RemoteAddr().String())
|
||||
@@ -375,7 +382,7 @@ func (ka *KamailioAgent) V1DisconnectSession(args utils.AttrDisconnectSession, r
|
||||
}
|
||||
if int(connIdx) >= len(ka.conns) { // protection against index out of range panic
|
||||
err = fmt.Errorf("Index out of range[0,%v): %v ", len(ka.conns), connIdx)
|
||||
utils.Logger.Err(fmt.Sprintf("<%s> %s", utils.FreeSWITCHAgent, err.Error()))
|
||||
utils.Logger.Err(fmt.Sprintf("<%s> %s", utils.KamailioAgent, err.Error()))
|
||||
return
|
||||
}
|
||||
if err = ka.disconnectSession(int(connIdx),
|
||||
@@ -403,3 +410,15 @@ func (ka *KamailioAgent) V1GetActiveSessionIDs(ignParam string, sessionIDs *[]*s
|
||||
}
|
||||
return
|
||||
}
|
||||
|
||||
// SetSessionSConnection sets the new connection to the session service
|
||||
// only used on reload
|
||||
func (ka *KamailioAgent) SetSessionSConnection(sS rpcclient.RpcClientConnection) {
|
||||
ka.sessionS = sS
|
||||
}
|
||||
|
||||
// Reload recreates the connection buffers
|
||||
// only used on reload
|
||||
func (ka *KamailioAgent) Reload() {
|
||||
ka.conns = make([]*kamevapi.KamEvapi, len(ka.cfg.EvapiConns))
|
||||
}
|
||||
|
||||
@@ -287,53 +287,6 @@ func startRadiusAgent(internalSMGChan, internalDispatcherSChan chan rpcclient.Rp
|
||||
exitChan <- true
|
||||
}
|
||||
|
||||
func startKamAgent(internalSMGChan, internalDispatcherSChan chan rpcclient.RpcClientConnection, exitChan chan bool) {
|
||||
var err error
|
||||
var sS rpcclient.RpcClientConnection
|
||||
var sSInternal bool
|
||||
utils.Logger.Info("Starting Kamailio agent")
|
||||
intSMGChan := internalSMGChan
|
||||
if cfg.DispatcherSCfg().Enabled {
|
||||
intSMGChan = internalDispatcherSChan
|
||||
}
|
||||
if !cfg.DispatcherSCfg().Enabled && cfg.KamAgentCfg().SessionSConns[0].Address == utils.MetaInternal {
|
||||
sSInternal = true
|
||||
sSIntConn := <-internalSMGChan
|
||||
internalSMGChan <- sSIntConn
|
||||
sS = utils.NewBiRPCInternalClient(sSIntConn.(*sessions.SessionS))
|
||||
} else {
|
||||
sS, err = engine.NewRPCPool(rpcclient.POOL_FIRST,
|
||||
cfg.TlsCfg().ClientKey,
|
||||
cfg.TlsCfg().ClientCerificate, cfg.TlsCfg().CaCertificate,
|
||||
cfg.GeneralCfg().ConnectAttempts, cfg.GeneralCfg().Reconnects,
|
||||
cfg.GeneralCfg().ConnectTimeout, cfg.GeneralCfg().ReplyTimeout,
|
||||
cfg.KamAgentCfg().SessionSConns, intSMGChan, false)
|
||||
if err != nil {
|
||||
utils.Logger.Crit(fmt.Sprintf("<%s> Could not connect to %s: %s",
|
||||
utils.KamailioAgent, utils.SessionS, err.Error()))
|
||||
exitChan <- true
|
||||
return
|
||||
}
|
||||
}
|
||||
ka := agents.NewKamailioAgent(cfg.KamAgentCfg(), sS,
|
||||
utils.FirstNonEmpty(cfg.KamAgentCfg().Timezone, cfg.GeneralCfg().DefaultTimezone))
|
||||
if sSInternal { // bidirectional client backwards connection
|
||||
sS.(*utils.BiRPCInternalClient).SetClientConn(ka)
|
||||
var rply string
|
||||
if err := sS.Call(utils.SessionSv1RegisterInternalBiJSONConn,
|
||||
utils.EmptyString, &rply); err != nil {
|
||||
utils.Logger.Crit(fmt.Sprintf("<%s> Could not connect to %s: %s",
|
||||
utils.KamailioAgent, utils.SessionS, err.Error()))
|
||||
exitChan <- true
|
||||
return
|
||||
}
|
||||
}
|
||||
if err = ka.Connect(); err != nil {
|
||||
utils.Logger.Err(fmt.Sprintf("<%s> error: %s", utils.KamailioAgent, err))
|
||||
}
|
||||
exitChan <- true
|
||||
}
|
||||
|
||||
func startHTTPAgent(internalSMGChan, internalDispatcherSChan chan rpcclient.RpcClientConnection,
|
||||
server *utils.Server, filterSChan chan *engine.FilterS, dfltTenant string, exitChan chan bool) {
|
||||
filterS := <-filterSChan
|
||||
@@ -900,7 +853,8 @@ func main() {
|
||||
srvManager.AddService(chS, attrS, chrS, tS, stS, reS, supS, schS, cdrS, rals, smg,
|
||||
services.NewEventReaderService(),
|
||||
services.NewDNSAgent(),
|
||||
services.NewFreeswitchAgent())
|
||||
services.NewFreeswitchAgent(),
|
||||
services.NewKamailioAgent())
|
||||
internalAttributeSChan := attrS.GetIntenternalChan()
|
||||
internalChargerSChan := chrS.GetIntenternalChan()
|
||||
internalThresholdSChan := tS.GetIntenternalChan()
|
||||
@@ -952,11 +906,6 @@ func main() {
|
||||
// Start CDRC components if necessary
|
||||
go startCdrcs(internalCdrSChan, internalRaterChan, internalDispatcherSChan, filterSChan, exitChan)
|
||||
|
||||
// Start SM-Kamailio
|
||||
if cfg.KamAgentCfg().Enabled {
|
||||
go startKamAgent(internalSMGChan, internalDispatcherSChan, exitChan)
|
||||
}
|
||||
|
||||
if cfg.AsteriskAgentCfg().Enabled {
|
||||
go startAsteriskAgent(internalSMGChan, internalDispatcherSChan, exitChan)
|
||||
}
|
||||
|
||||
@@ -1196,8 +1196,11 @@ func (cfg *CGRConfig) FsAgentCfg() *FsAgentCfg {
|
||||
return cfg.fsAgentCfg
|
||||
}
|
||||
|
||||
func (self *CGRConfig) KamAgentCfg() *KamAgentCfg {
|
||||
return self.kamAgentCfg
|
||||
// KamAgentCfg returns the config for KamAgent
|
||||
func (cfg *CGRConfig) KamAgentCfg() *KamAgentCfg {
|
||||
cfg.lks[KamailioAgentJSN].Lock()
|
||||
defer cfg.lks[KamailioAgentJSN].Unlock()
|
||||
return cfg.kamAgentCfg
|
||||
}
|
||||
|
||||
// ToDo: fix locking here
|
||||
@@ -1553,6 +1556,7 @@ func (cfg *CGRConfig) reloadSection(section string) (err error) {
|
||||
}
|
||||
fallthrough
|
||||
case KamailioAgentJSN:
|
||||
cfg.rldChans[KamailioAgentJSN] <- struct{}{}
|
||||
if !fall {
|
||||
break
|
||||
}
|
||||
|
||||
@@ -56,7 +56,7 @@ func (fS *FreeswitchAgent) Start(sp servmanager.ServiceProvider, waitCache bool)
|
||||
srvSessionS, has := sp.GetService(utils.SessionS)
|
||||
if !has {
|
||||
utils.Logger.Err(fmt.Sprintf("<%s> Failed to find needed subsystem <%s>",
|
||||
utils.DNSAgent, utils.SessionS))
|
||||
utils.FreeSWITCHAgent, utils.SessionS))
|
||||
return utils.ErrNotFound
|
||||
}
|
||||
sSIntConn := <-srvSessionS.GetIntenternalChan()
|
||||
@@ -105,7 +105,7 @@ func (fS *FreeswitchAgent) Reload(sp servmanager.ServiceProvider) (err error) {
|
||||
srvSessionS, has := sp.GetService(utils.SessionS)
|
||||
if !has {
|
||||
utils.Logger.Err(fmt.Sprintf("<%s> Failed to find needed subsystem <%s>",
|
||||
utils.DNSAgent, utils.SessionS))
|
||||
utils.FreeSWITCHAgent, utils.SessionS))
|
||||
return utils.ErrNotFound
|
||||
}
|
||||
sSIntConn := <-srvSessionS.GetIntenternalChan()
|
||||
|
||||
179
services/kamailioagent.go
Normal file
179
services/kamailioagent.go
Normal file
@@ -0,0 +1,179 @@
|
||||
/*
|
||||
Real-time Online/Offline Charging System (OCS) for Telecom & ISP environments
|
||||
Copyright (C) ITsysCOM GmbH
|
||||
|
||||
This program is free software: you can redistribute it and/or modify
|
||||
it under the terms of the GNU General Public License as published by
|
||||
the Free Software Foundation, either version 3 of the License, or
|
||||
(at your option) any later version.
|
||||
|
||||
This program is distributed in the hope that it will be useful,
|
||||
but WITHOUT ANY WARRANTY; without even the implied warranty of
|
||||
MERCHANTABILITY or FITNESS FOR A PARTICULAR PURPOSE. See the
|
||||
GNU General Public License for more details.
|
||||
|
||||
You should have received a copy of the GNU General Public License
|
||||
along with this program. If not, see <http://www.gnu.org/licenses/>
|
||||
*/
|
||||
|
||||
package services
|
||||
|
||||
import (
|
||||
"fmt"
|
||||
"strings"
|
||||
"sync"
|
||||
|
||||
"github.com/cgrates/cgrates/agents"
|
||||
"github.com/cgrates/cgrates/servmanager"
|
||||
"github.com/cgrates/cgrates/sessions"
|
||||
"github.com/cgrates/cgrates/utils"
|
||||
"github.com/cgrates/rpcclient"
|
||||
)
|
||||
|
||||
// NewKamailioAgent returns the Kamailio Agent
|
||||
func NewKamailioAgent() servmanager.Service {
|
||||
return new(KamailioAgent)
|
||||
}
|
||||
|
||||
// KamailioAgent implements Agent interface
|
||||
type KamailioAgent struct {
|
||||
sync.RWMutex
|
||||
kam *agents.KamailioAgent
|
||||
}
|
||||
|
||||
// Start should handle the sercive start
|
||||
func (kam *KamailioAgent) Start(sp servmanager.ServiceProvider, waitCache bool) (err error) {
|
||||
if kam.IsRunning() {
|
||||
return fmt.Errorf("service aleady running")
|
||||
}
|
||||
|
||||
kam.Lock()
|
||||
defer kam.Unlock()
|
||||
var sS rpcclient.RpcClientConnection
|
||||
var sSInternal bool
|
||||
utils.Logger.Info("Starting Kamailio agent")
|
||||
if !sp.GetConfig().DispatcherSCfg().Enabled && sp.GetConfig().KamAgentCfg().SessionSConns[0].Address == utils.MetaInternal {
|
||||
sSInternal = true
|
||||
srvSessionS, has := sp.GetService(utils.SessionS)
|
||||
if !has {
|
||||
utils.Logger.Err(fmt.Sprintf("<%s> Failed to find needed subsystem <%s>",
|
||||
utils.KamailioAgent, utils.SessionS))
|
||||
return utils.ErrNotFound
|
||||
}
|
||||
sSIntConn := <-srvSessionS.GetIntenternalChan()
|
||||
srvSessionS.GetIntenternalChan() <- sSIntConn
|
||||
sS = utils.NewBiRPCInternalClient(sSIntConn.(*sessions.SessionS))
|
||||
} else {
|
||||
if sS, err = sp.NewConnection(utils.SessionS, sp.GetConfig().KamAgentCfg().SessionSConns); err != nil {
|
||||
utils.Logger.Crit(fmt.Sprintf("<%s> Could not connect to %s: %s",
|
||||
utils.KamailioAgent, utils.SessionS, err.Error()))
|
||||
return
|
||||
}
|
||||
}
|
||||
kam.kam = agents.NewKamailioAgent(sp.GetConfig().KamAgentCfg(), sS,
|
||||
utils.FirstNonEmpty(sp.GetConfig().KamAgentCfg().Timezone, sp.GetConfig().GeneralCfg().DefaultTimezone))
|
||||
if sSInternal { // bidirectional client backwards connection
|
||||
sS.(*utils.BiRPCInternalClient).SetClientConn(kam.kam)
|
||||
var rply string
|
||||
if err = sS.Call(utils.SessionSv1RegisterInternalBiJSONConn,
|
||||
utils.EmptyString, &rply); err != nil {
|
||||
utils.Logger.Crit(fmt.Sprintf("<%s> Could not connect to %s: %s",
|
||||
utils.KamailioAgent, utils.SessionS, err.Error()))
|
||||
return
|
||||
}
|
||||
}
|
||||
go func() {
|
||||
if err = kam.kam.Connect(); err != nil {
|
||||
if strings.Contains(err.Error(), "use of closed network connection") { // if closed by us do not log
|
||||
return
|
||||
}
|
||||
utils.Logger.Err(fmt.Sprintf("<%s> error: %s", utils.KamailioAgent, err))
|
||||
sp.GetExitChan() <- true
|
||||
}
|
||||
}()
|
||||
return
|
||||
}
|
||||
|
||||
// GetIntenternalChan returns the internal connection chanel
|
||||
func (kam *KamailioAgent) GetIntenternalChan() (conn chan rpcclient.RpcClientConnection) {
|
||||
return nil
|
||||
}
|
||||
|
||||
// Reload handles the change of config
|
||||
func (kam *KamailioAgent) Reload(sp servmanager.ServiceProvider) (err error) {
|
||||
var sS rpcclient.RpcClientConnection
|
||||
var sSInternal bool
|
||||
if !sp.GetConfig().DispatcherSCfg().Enabled && sp.GetConfig().KamAgentCfg().SessionSConns[0].Address == utils.MetaInternal {
|
||||
sSInternal = true
|
||||
srvSessionS, has := sp.GetService(utils.SessionS)
|
||||
if !has {
|
||||
utils.Logger.Err(fmt.Sprintf("<%s> Failed to find needed subsystem <%s>",
|
||||
utils.KamailioAgent, utils.SessionS))
|
||||
return utils.ErrNotFound
|
||||
}
|
||||
sSIntConn := <-srvSessionS.GetIntenternalChan()
|
||||
srvSessionS.GetIntenternalChan() <- sSIntConn
|
||||
sS = utils.NewBiRPCInternalClient(sSIntConn.(*sessions.SessionS))
|
||||
} else {
|
||||
if sS, err = sp.NewConnection(utils.SessionS, sp.GetConfig().FsAgentCfg().SessionSConns); err != nil {
|
||||
utils.Logger.Crit(fmt.Sprintf("<%s> Could not connect to %s: %s",
|
||||
utils.KamailioAgent, utils.SessionS, err.Error()))
|
||||
return
|
||||
}
|
||||
}
|
||||
if err = kam.Shutdown(); err != nil {
|
||||
return
|
||||
}
|
||||
kam.Lock()
|
||||
defer kam.Unlock()
|
||||
kam.kam.SetSessionSConnection(sS)
|
||||
kam.kam.Reload()
|
||||
if sSInternal { // bidirectional client backwards connection
|
||||
sS.(*utils.BiRPCInternalClient).SetClientConn(kam.kam)
|
||||
var rply string
|
||||
if err = sS.Call(utils.SessionSv1RegisterInternalBiJSONConn,
|
||||
utils.EmptyString, &rply); err != nil {
|
||||
utils.Logger.Crit(fmt.Sprintf("<%s> Could not connect to %s: %s",
|
||||
utils.KamailioAgent, utils.SessionS, err.Error()))
|
||||
return
|
||||
}
|
||||
}
|
||||
go func() {
|
||||
if err = kam.kam.Connect(); err != nil {
|
||||
if strings.Contains(err.Error(), "use of closed network connection") { // if closed by us do not log
|
||||
return
|
||||
}
|
||||
utils.Logger.Err(fmt.Sprintf("<%s> error: %s", utils.KamailioAgent, err))
|
||||
sp.GetExitChan() <- true
|
||||
}
|
||||
}()
|
||||
return
|
||||
}
|
||||
|
||||
// Shutdown stops the service
|
||||
func (kam *KamailioAgent) Shutdown() (err error) {
|
||||
kam.Lock()
|
||||
defer kam.Unlock()
|
||||
if err = kam.kam.Shutdown(); err != nil {
|
||||
return
|
||||
}
|
||||
kam.kam = nil
|
||||
return
|
||||
}
|
||||
|
||||
// GetRPCInterface returns the interface to register for server
|
||||
func (kam *KamailioAgent) GetRPCInterface() interface{} {
|
||||
return kam.kam
|
||||
}
|
||||
|
||||
// IsRunning returns if the service is running
|
||||
func (kam *KamailioAgent) IsRunning() bool {
|
||||
kam.RLock()
|
||||
defer kam.RUnlock()
|
||||
return kam != nil && kam.kam != nil
|
||||
}
|
||||
|
||||
// ServiceName returns the service name
|
||||
func (kam *KamailioAgent) ServiceName() string {
|
||||
return utils.KamailioAgent
|
||||
}
|
||||
@@ -286,6 +286,9 @@ func (srvMngr *ServiceManager) StartServices() (err error) {
|
||||
if srvMngr.GetConfig().FsAgentCfg().Enabled {
|
||||
go srvMngr.startService(utils.FreeSWITCHAgent)
|
||||
}
|
||||
if srvMngr.GetConfig().KamAgentCfg().Enabled {
|
||||
go srvMngr.startService(utils.KamailioAgent)
|
||||
}
|
||||
// startServer()
|
||||
return
|
||||
}
|
||||
@@ -370,6 +373,10 @@ func (srvMngr *ServiceManager) handleReload() {
|
||||
if err = srvMngr.reloadService(utils.FreeSWITCHAgent, srvMngr.GetConfig().FsAgentCfg().Enabled); err != nil {
|
||||
return
|
||||
}
|
||||
case <-srvMngr.GetConfig().GetReloadChan(config.KamailioAgentJSN):
|
||||
if err = srvMngr.reloadService(utils.KamailioAgent, srvMngr.GetConfig().KamAgentCfg().Enabled); err != nil {
|
||||
return
|
||||
}
|
||||
}
|
||||
// handle RPC server
|
||||
}
|
||||
|
||||
Reference in New Issue
Block a user