Bidirectional RPC in diameter agent for internal connections

This commit is contained in:
DanB
2018-12-29 19:39:30 +01:00
parent e92f6d3224
commit 1d2dfa1ddd
4 changed files with 90 additions and 27 deletions

View File

@@ -36,11 +36,11 @@ import (
)
func NewDiameterAgent(cgrCfg *config.CGRConfig, filterS *engine.FilterS,
sessionS rpcclient.RpcClientConnection) (*DiameterAgent, error) {
if sessionS != nil && reflect.ValueOf(sessionS).IsNil() {
sessionS = nil
sS rpcclient.RpcClientConnection) (*DiameterAgent, error) {
if sS != nil && reflect.ValueOf(sS).IsNil() {
sS = nil
}
da := &DiameterAgent{cgrCfg: cgrCfg, filterS: filterS, sessionS: sessionS}
da := &DiameterAgent{cgrCfg: cgrCfg, filterS: filterS, sS: sS}
dictsPath := cgrCfg.DiameterAgentCfg().DictionariesPath
if len(dictsPath) != 0 {
if err := loadDictionaries(dictsPath, utils.DiameterAgent); err != nil {
@@ -61,15 +61,16 @@ func NewDiameterAgent(cgrCfg *config.CGRConfig, filterS *engine.FilterS,
procsr.ReplyFields = tpls
}
}
//da.sS.SetClientConn(da) // pass the connection to sessionS back so we can receive the disconnects
return da, nil
}
type DiameterAgent struct {
cgrCfg *config.CGRConfig
filterS *engine.FilterS
sessionS rpcclient.RpcClientConnection // Connection towards CGR-SessionS component
sS rpcclient.RpcClientConnection // Connection towards CGR-SessionS component
aReqs int
sync.RWMutex
aReqsLck sync.RWMutex
}
// ListenAndServe is called when DiameterAgent is started, usually from within cmd/cgr-engine
@@ -160,7 +161,7 @@ func (da *DiameterAgent) handleMessage(c diam.Conn, m *diam.Message) {
return
}
if da.cgrCfg.DiameterAgentCfg().MaxActiveReqs != -1 {
da.Lock()
da.aReqsLck.Lock()
if da.aReqs == da.cgrCfg.DiameterAgentCfg().MaxActiveReqs {
utils.Logger.Err(
fmt.Sprintf("<%s> denying request due to maximum active requests reached: %d, message: %s",
@@ -169,11 +170,11 @@ func (da *DiameterAgent) handleMessage(c diam.Conn, m *diam.Message) {
return
}
da.aReqs++
da.Unlock()
da.aReqsLck.Unlock()
defer func() { // schedule decrement when returning out of function
da.Lock()
da.aReqsLck.Lock()
da.aReqs--
da.Unlock()
da.aReqsLck.Unlock()
}()
}
rply := config.NewNavigableMap(nil) // share it among different processors
@@ -266,7 +267,7 @@ func (da *DiameterAgent) processRequest(reqProcessor *config.DARequestProcessor,
reqProcessor.Flags.HasKey(utils.MetaSuppliersEventCost),
*cgrEv)
var authReply sessions.V1AuthorizeReply
err = da.sessionS.Call(utils.SessionSv1AuthorizeEvent,
err = da.sS.Call(utils.SessionSv1AuthorizeEvent,
authArgs, &authReply)
if agReq.CGRReply, err = NewCGRReply(&authReply, err); err != nil {
return
@@ -279,7 +280,7 @@ func (da *DiameterAgent) processRequest(reqProcessor *config.DARequestProcessor,
reqProcessor.Flags.HasKey(utils.MetaThresholds),
reqProcessor.Flags.HasKey(utils.MetaStats), *cgrEv)
var initReply sessions.V1InitSessionReply
err = da.sessionS.Call(utils.SessionSv1InitiateSession,
err = da.sS.Call(utils.SessionSv1InitiateSession,
initArgs, &initReply)
if agReq.CGRReply, err = NewCGRReply(&initReply, err); err != nil {
return
@@ -289,7 +290,7 @@ func (da *DiameterAgent) processRequest(reqProcessor *config.DARequestProcessor,
reqProcessor.Flags.HasKey(utils.MetaAttributes),
reqProcessor.Flags.HasKey(utils.MetaAccounts), *cgrEv)
var updateReply sessions.V1UpdateSessionReply
err = da.sessionS.Call(utils.SessionSv1UpdateSession,
err = da.sS.Call(utils.SessionSv1UpdateSession,
updateArgs, &updateReply)
if agReq.CGRReply, err = NewCGRReply(&updateReply, err); err != nil {
return
@@ -301,7 +302,7 @@ func (da *DiameterAgent) processRequest(reqProcessor *config.DARequestProcessor,
reqProcessor.Flags.HasKey(utils.MetaThresholds),
reqProcessor.Flags.HasKey(utils.MetaStats), *cgrEv)
var tRply string
err = da.sessionS.Call(utils.SessionSv1TerminateSession,
err = da.sS.Call(utils.SessionSv1TerminateSession,
terminateArgs, &tRply)
if agReq.CGRReply, err = NewCGRReply(nil, err); err != nil {
return
@@ -315,7 +316,7 @@ func (da *DiameterAgent) processRequest(reqProcessor *config.DARequestProcessor,
reqProcessor.Flags.HasKey(utils.MetaStats),
*cgrEv)
var eventRply sessions.V1ProcessEventReply
err = da.sessionS.Call(utils.SessionSv1ProcessEvent,
err = da.sS.Call(utils.SessionSv1ProcessEvent,
evArgs, &eventRply)
if utils.ErrHasPrefix(err, utils.RalsErrorPrfx) {
cgrEv.Event[utils.Usage] = 0 // avoid further debits
@@ -331,7 +332,7 @@ func (da *DiameterAgent) processRequest(reqProcessor *config.DARequestProcessor,
if reqProcessor.Flags.HasKey(utils.MetaCDRs) &&
!reqProcessor.Flags.HasKey(utils.MetaDryRun) {
var rplyCDRs string
if err = da.sessionS.Call(utils.SessionSv1ProcessCDR,
if err = da.sS.Call(utils.SessionSv1ProcessCDR,
cgrEv, &rplyCDRs); err != nil {
agReq.CGRReply.Set([]string{utils.Error}, err.Error(), false, false)
}
@@ -353,3 +354,19 @@ func (da *DiameterAgent) processRequest(reqProcessor *config.DARequestProcessor,
}
return true, nil
}
// rpcclient.RpcClientConnection interface
func (da *DiameterAgent) Call(serviceMethod string, args interface{}, reply interface{}) error {
return utils.RPCCall(da, serviceMethod, args, reply)
}
// V1DisconnectSession is part of the sessions.SessionSClient
func (da *DiameterAgent) V1DisconnectSession(args utils.AttrDisconnectSession, reply *string) error {
return utils.ErrNotImplemented
}
// V1GetActiveSessionIDs is part of the sessions.SessionSClient
func (da *DiameterAgent) V1GetActiveSessionIDs(ignParam string,
sessionIDs *[]*sessions.SessionID) error {
return utils.ErrNotImplemented
}

28
agents/diamagent_test.go Normal file
View File

@@ -0,0 +1,28 @@
/*
Real-time Online/Offline Charging System (OCS) for Telecom & ISP environments
Copyright (C) ITsysCOM GmbH
This program is free software: you can redistribute it and/or modify
it under the terms of the GNU General Public License as published by
the Free Software Foundation, either version 3 of the License, or
(at your option) any later version.
This program is distributed in the hope that it will be useful,
but WITHOUT ANY WARRANTY; without even the implied warranty of
MERCHANTABILITY or FITNESS FOR A PARTICULAR PURPOSE. See the
GNU General Public License for more details.
You should have received a copy of the GNU General Public License
along with this program. If not, see <http://www.gnu.org/licenses/>
*/
package agents
import (
"testing"
"github.com/cgrates/cgrates/sessions"
)
func TestDAsSessionSClientIface(t *testing.T) {
_ = sessions.SessionSClient(new(DiameterAgent))
}

View File

@@ -333,20 +333,33 @@ func startAsteriskAgent(internalSMGChan chan rpcclient.RpcClientConnection, exit
exitChan <- true
}
func startDiameterAgent(internalSMGChan chan rpcclient.RpcClientConnection,
func startDiameterAgent(internalSsChan chan rpcclient.RpcClientConnection,
exitChan chan bool, filterSChan chan *engine.FilterS) {
var err error
utils.Logger.Info("Starting CGRateS DiameterAgent service")
filterS := <-filterSChan
filterSChan <- filterS
var smgConn *rpcclient.RpcClientPool
if len(cfg.DiameterAgentCfg().SessionSConns) != 0 {
smgConn, err = engine.NewRPCPool(rpcclient.POOL_FIRST,
var sS rpcclient.RpcClientConnection
var sSInternal bool
if len(cfg.DiameterAgentCfg().SessionSConns) == 0 {
utils.Logger.Crit(
fmt.Sprintf("<%s> no SessionS connections defined",
utils.DiameterAgent))
exitChan <- true
return
}
if cfg.DiameterAgentCfg().SessionSConns[0].Address == utils.MetaInternal {
sSInternal = true
sSIntConn := <-internalSsChan
internalSsChan <- sSIntConn
sS = utils.NewBiRPCInternalClient(sSIntConn.(*sessions.SMGeneric))
} else {
sS, err = engine.NewRPCPool(rpcclient.POOL_FIRST,
cfg.TlsCfg().ClientKey,
cfg.TlsCfg().ClientCerificate, cfg.TlsCfg().CaCertificate,
cfg.GeneralCfg().ConnectAttempts, cfg.GeneralCfg().Reconnects,
cfg.GeneralCfg().ConnectTimeout, cfg.GeneralCfg().ReplyTimeout,
cfg.DiameterAgentCfg().SessionSConns, internalSMGChan,
cfg.DiameterAgentCfg().SessionSConns, internalSsChan,
cfg.GeneralCfg().InternalTtl)
if err != nil {
utils.Logger.Crit(fmt.Sprintf("<%s> Could not connect to %s: %s",
@@ -355,12 +368,15 @@ func startDiameterAgent(internalSMGChan chan rpcclient.RpcClientConnection,
return
}
}
da, err := agents.NewDiameterAgent(cfg, filterS, smgConn)
da, err := agents.NewDiameterAgent(cfg, filterS, sS)
if err != nil {
utils.Logger.Err(fmt.Sprintf("<DiameterAgent> error: %s!", err))
exitChan <- true
return
}
if sSInternal { // bidirectional client backwards connection
sS.(*utils.BiRPCInternalClient).SetClientConn(da)
}
if err = da.ListenAndServe(); err != nil {
utils.Logger.Err(fmt.Sprintf("<DiameterAgent> error: %s!", err))
}

View File

@@ -605,11 +605,13 @@ func (self *CGRConfig) checkConfigSanity() error {
}
}
// DAgent checks
if self.diameterAgentCfg.Enabled && !self.sessionSCfg.Enabled {
for _, daSMGConn := range self.diameterAgentCfg.SessionSConns {
if daSMGConn.Address == utils.MetaInternal {
return fmt.Errorf("%s not enabled but referenced by %s component",
utils.SessionS, utils.DiameterAgent)
if self.diameterAgentCfg.Enabled {
if !self.sessionSCfg.Enabled {
for _, daSMGConn := range self.diameterAgentCfg.SessionSConns {
if daSMGConn.Address == utils.MetaInternal {
return fmt.Errorf("%s not enabled but referenced by %s component",
utils.SessionS, utils.DiameterAgent)
}
}
}
}