Updated AsteriskAgent

This commit is contained in:
Tripon Alexandru-Ionut
2019-04-15 10:29:23 +03:00
committed by Dan Christian Bogos
parent 0f7088ff3a
commit 531537e3d4
2 changed files with 57 additions and 16 deletions

View File

@@ -32,6 +32,7 @@ import (
"github.com/cgrates/cgrates/engine"
"github.com/cgrates/cgrates/sessions"
"github.com/cgrates/cgrates/utils"
"github.com/cgrates/rpcclient"
)
const (
@@ -56,20 +57,20 @@ const (
)
func NewAsteriskAgent(cgrCfg *config.CGRConfig, astConnIdx int,
smgConn *utils.BiRPCInternalClient) (*AsteriskAgent, error) {
smgConn rpcclient.RpcClientConnection) (*AsteriskAgent, error) {
sma := &AsteriskAgent{
cgrCfg: cgrCfg,
astConnIdx: astConnIdx,
smg: smgConn,
eventsCache: make(map[string]*utils.CGREventWithArgDispatcher),
}
sma.smg.SetClientConn(sma) // pass the connection to SMA back into smg so we can receive the disconnects
return sma, nil
}
type AsteriskAgent struct {
cgrCfg *config.CGRConfig // Separate from smCfg since there can be multiple
astConnIdx int
smg *utils.BiRPCInternalClient
smg rpcclient.RpcClientConnection
astConn *aringo.ARInGO
astEvChan chan map[string]interface{}
astErrChan chan error

View File

@@ -315,27 +315,67 @@ func startSessionS(internalSMGChan, internalRaterChan, internalResourceSChan, in
}
}
func startAsteriskAgent(internalSMGChan chan rpcclient.RpcClientConnection, exitChan chan bool) {
func startAsteriskAgent(internalSMGChan, internalDispatcherSChan chan rpcclient.RpcClientConnection, exitChan chan bool) {
var err error
var sS rpcclient.RpcClientConnection
var sSInternal bool
utils.Logger.Info("Starting Asterisk agent")
smgRpcConn := <-internalSMGChan
internalSMGChan <- smgRpcConn
birpcClnt := utils.NewBiRPCInternalClient(smgRpcConn.(*sessions.SessionS))
var reply string
if cfg.DispatcherSCfg().Enabled {
sS = <-internalDispatcherSChan
internalDispatcherSChan <- sS
} else if len(cfg.AsteriskAgentCfg().SessionSConns) == 0 {
utils.Logger.Crit(
fmt.Sprintf("<%s> no SessionS connections defined",
utils.AsteriskAgent))
exitChan <- true
return
} else if cfg.AsteriskAgentCfg().SessionSConns[0].Address == utils.MetaInternal {
sSInternal = true
sSIntConn := <-internalSMGChan
internalSMGChan <- sSIntConn
sS = utils.NewBiRPCInternalClient(sSIntConn.(*sessions.SessionS))
} else {
sS, err = engine.NewRPCPool(rpcclient.POOL_FIRST,
cfg.TlsCfg().ClientKey,
cfg.TlsCfg().ClientCerificate, cfg.TlsCfg().CaCertificate,
cfg.GeneralCfg().ConnectAttempts, cfg.GeneralCfg().Reconnects,
cfg.GeneralCfg().ConnectTimeout, cfg.GeneralCfg().ReplyTimeout,
cfg.AsteriskAgentCfg().SessionSConns, internalSMGChan,
cfg.GeneralCfg().InternalTtl, false)
if err != nil {
utils.Logger.Crit(fmt.Sprintf("<%s> Could not connect to %s: %s",
utils.AsteriskAgent, utils.SessionS, err.Error()))
exitChan <- true
return
}
}
listenAndServe := func(sma *agents.AsteriskAgent, exitChan chan bool) {
if err = sma.ListenAndServe(); err != nil {
utils.Logger.Err(fmt.Sprintf("<%s> runtime error: %s!", utils.AsteriskAgent, err))
}
exitChan <- true
}
for connIdx := range cfg.AsteriskAgentCfg().AsteriskConns { // Instantiate connections towards asterisk servers
sma, err := agents.NewAsteriskAgent(cfg, connIdx, birpcClnt)
sma, err := agents.NewAsteriskAgent(cfg, connIdx, sS)
if err != nil {
utils.Logger.Err(fmt.Sprintf("<%s> error: %s!", utils.AsteriskAgent, err))
exitChan <- true
return
}
if err := birpcClnt.Call(utils.SessionSv1RegisterInternalBiJSONConn, "", &reply); err != nil { // for session sync
utils.Logger.Err(fmt.Sprintf("<%s> error: %s!", utils.AsteriskAgent, err))
}
if err = sma.ListenAndServe(); err != nil {
utils.Logger.Err(fmt.Sprintf("<%s> runtime error: %s!", utils.AsteriskAgent, err))
if sSInternal { // bidirectional client backwards connection
sS.(*utils.BiRPCInternalClient).SetClientConn(sma)
var rply string
if err := sS.Call(utils.SessionSv1RegisterInternalBiJSONConn,
utils.EmptyString, &rply); err != nil {
utils.Logger.Crit(fmt.Sprintf("<%s> Could not connect to %s: %s",
utils.AsteriskAgent, utils.SessionS, err.Error()))
exitChan <- true
return
}
}
go listenAndServe(sma, exitChan)
}
exitChan <- true
}
func startDiameterAgent(internalSsChan, internalDispatcherSChan chan rpcclient.RpcClientConnection,
@@ -1614,7 +1654,7 @@ func main() {
}
if cfg.AsteriskAgentCfg().Enabled {
go startAsteriskAgent(internalSMGChan, exitChan)
go startAsteriskAgent(internalSMGChan, internalDispatcherSChan, exitChan)
}
if cfg.DiameterAgentCfg().Enabled {