diff --git a/agents/astagent.go b/agents/astagent.go index a701478cd..802c5a27d 100644 --- a/agents/astagent.go +++ b/agents/astagent.go @@ -32,6 +32,7 @@ import ( "github.com/cgrates/cgrates/engine" "github.com/cgrates/cgrates/sessions" "github.com/cgrates/cgrates/utils" + "github.com/cgrates/rpcclient" ) const ( @@ -56,20 +57,20 @@ const ( ) func NewAsteriskAgent(cgrCfg *config.CGRConfig, astConnIdx int, - smgConn *utils.BiRPCInternalClient) (*AsteriskAgent, error) { + smgConn rpcclient.RpcClientConnection) (*AsteriskAgent, error) { sma := &AsteriskAgent{ cgrCfg: cgrCfg, + astConnIdx: astConnIdx, smg: smgConn, eventsCache: make(map[string]*utils.CGREventWithArgDispatcher), } - sma.smg.SetClientConn(sma) // pass the connection to SMA back into smg so we can receive the disconnects return sma, nil } type AsteriskAgent struct { cgrCfg *config.CGRConfig // Separate from smCfg since there can be multiple astConnIdx int - smg *utils.BiRPCInternalClient + smg rpcclient.RpcClientConnection astConn *aringo.ARInGO astEvChan chan map[string]interface{} astErrChan chan error diff --git a/cmd/cgr-engine/cgr-engine.go b/cmd/cgr-engine/cgr-engine.go index c3e18c856..ac040bbc7 100644 --- a/cmd/cgr-engine/cgr-engine.go +++ b/cmd/cgr-engine/cgr-engine.go @@ -315,27 +315,67 @@ func startSessionS(internalSMGChan, internalRaterChan, internalResourceSChan, in } } -func startAsteriskAgent(internalSMGChan chan rpcclient.RpcClientConnection, exitChan chan bool) { +func startAsteriskAgent(internalSMGChan, internalDispatcherSChan chan rpcclient.RpcClientConnection, exitChan chan bool) { + var err error + var sS rpcclient.RpcClientConnection + var sSInternal bool utils.Logger.Info("Starting Asterisk agent") - smgRpcConn := <-internalSMGChan - internalSMGChan <- smgRpcConn - birpcClnt := utils.NewBiRPCInternalClient(smgRpcConn.(*sessions.SessionS)) - var reply string + if cfg.DispatcherSCfg().Enabled { + sS = <-internalDispatcherSChan + internalDispatcherSChan <- sS + } else if len(cfg.AsteriskAgentCfg().SessionSConns) == 0 { + utils.Logger.Crit( + fmt.Sprintf("<%s> no SessionS connections defined", + utils.AsteriskAgent)) + exitChan <- true + return + } else if cfg.AsteriskAgentCfg().SessionSConns[0].Address == utils.MetaInternal { + sSInternal = true + sSIntConn := <-internalSMGChan + internalSMGChan <- sSIntConn + sS = utils.NewBiRPCInternalClient(sSIntConn.(*sessions.SessionS)) + } else { + sS, err = engine.NewRPCPool(rpcclient.POOL_FIRST, + cfg.TlsCfg().ClientKey, + cfg.TlsCfg().ClientCerificate, cfg.TlsCfg().CaCertificate, + cfg.GeneralCfg().ConnectAttempts, cfg.GeneralCfg().Reconnects, + cfg.GeneralCfg().ConnectTimeout, cfg.GeneralCfg().ReplyTimeout, + cfg.AsteriskAgentCfg().SessionSConns, internalSMGChan, + cfg.GeneralCfg().InternalTtl, false) + if err != nil { + utils.Logger.Crit(fmt.Sprintf("<%s> Could not connect to %s: %s", + utils.AsteriskAgent, utils.SessionS, err.Error())) + exitChan <- true + return + } + } + + listenAndServe := func(sma *agents.AsteriskAgent, exitChan chan bool) { + if err = sma.ListenAndServe(); err != nil { + utils.Logger.Err(fmt.Sprintf("<%s> runtime error: %s!", utils.AsteriskAgent, err)) + } + exitChan <- true + } for connIdx := range cfg.AsteriskAgentCfg().AsteriskConns { // Instantiate connections towards asterisk servers - sma, err := agents.NewAsteriskAgent(cfg, connIdx, birpcClnt) + sma, err := agents.NewAsteriskAgent(cfg, connIdx, sS) if err != nil { utils.Logger.Err(fmt.Sprintf("<%s> error: %s!", utils.AsteriskAgent, err)) exitChan <- true return } - if err := birpcClnt.Call(utils.SessionSv1RegisterInternalBiJSONConn, "", &reply); err != nil { // for session sync - utils.Logger.Err(fmt.Sprintf("<%s> error: %s!", utils.AsteriskAgent, err)) - } - if err = sma.ListenAndServe(); err != nil { - utils.Logger.Err(fmt.Sprintf("<%s> runtime error: %s!", utils.AsteriskAgent, err)) + if sSInternal { // bidirectional client backwards connection + sS.(*utils.BiRPCInternalClient).SetClientConn(sma) + var rply string + if err := sS.Call(utils.SessionSv1RegisterInternalBiJSONConn, + utils.EmptyString, &rply); err != nil { + utils.Logger.Crit(fmt.Sprintf("<%s> Could not connect to %s: %s", + utils.AsteriskAgent, utils.SessionS, err.Error())) + exitChan <- true + return + } } + go listenAndServe(sma, exitChan) } - exitChan <- true } func startDiameterAgent(internalSsChan, internalDispatcherSChan chan rpcclient.RpcClientConnection, @@ -1614,7 +1654,7 @@ func main() { } if cfg.AsteriskAgentCfg().Enabled { - go startAsteriskAgent(internalSMGChan, exitChan) + go startAsteriskAgent(internalSMGChan, internalDispatcherSChan, exitChan) } if cfg.DiameterAgentCfg().Enabled {