diff --git a/agents/diamagent.go b/agents/diamagent.go index 5dd16e6d8..422732721 100644 --- a/agents/diamagent.go +++ b/agents/diamagent.go @@ -36,11 +36,11 @@ import ( ) func NewDiameterAgent(cgrCfg *config.CGRConfig, filterS *engine.FilterS, - sessionS rpcclient.RpcClientConnection) (*DiameterAgent, error) { - if sessionS != nil && reflect.ValueOf(sessionS).IsNil() { - sessionS = nil + sS rpcclient.RpcClientConnection) (*DiameterAgent, error) { + if sS != nil && reflect.ValueOf(sS).IsNil() { + sS = nil } - da := &DiameterAgent{cgrCfg: cgrCfg, filterS: filterS, sessionS: sessionS} + da := &DiameterAgent{cgrCfg: cgrCfg, filterS: filterS, sS: sS} dictsPath := cgrCfg.DiameterAgentCfg().DictionariesPath if len(dictsPath) != 0 { if err := loadDictionaries(dictsPath, utils.DiameterAgent); err != nil { @@ -61,15 +61,16 @@ func NewDiameterAgent(cgrCfg *config.CGRConfig, filterS *engine.FilterS, procsr.ReplyFields = tpls } } + //da.sS.SetClientConn(da) // pass the connection to sessionS back so we can receive the disconnects return da, nil } type DiameterAgent struct { cgrCfg *config.CGRConfig filterS *engine.FilterS - sessionS rpcclient.RpcClientConnection // Connection towards CGR-SessionS component + sS rpcclient.RpcClientConnection // Connection towards CGR-SessionS component aReqs int - sync.RWMutex + aReqsLck sync.RWMutex } // ListenAndServe is called when DiameterAgent is started, usually from within cmd/cgr-engine @@ -160,7 +161,7 @@ func (da *DiameterAgent) handleMessage(c diam.Conn, m *diam.Message) { return } if da.cgrCfg.DiameterAgentCfg().MaxActiveReqs != -1 { - da.Lock() + da.aReqsLck.Lock() if da.aReqs == da.cgrCfg.DiameterAgentCfg().MaxActiveReqs { utils.Logger.Err( fmt.Sprintf("<%s> denying request due to maximum active requests reached: %d, message: %s", @@ -169,11 +170,11 @@ func (da *DiameterAgent) handleMessage(c diam.Conn, m *diam.Message) { return } da.aReqs++ - da.Unlock() + da.aReqsLck.Unlock() defer func() { // schedule decrement when returning out of function - da.Lock() + da.aReqsLck.Lock() da.aReqs-- - da.Unlock() + da.aReqsLck.Unlock() }() } rply := config.NewNavigableMap(nil) // share it among different processors @@ -266,7 +267,7 @@ func (da *DiameterAgent) processRequest(reqProcessor *config.DARequestProcessor, reqProcessor.Flags.HasKey(utils.MetaSuppliersEventCost), *cgrEv) var authReply sessions.V1AuthorizeReply - err = da.sessionS.Call(utils.SessionSv1AuthorizeEvent, + err = da.sS.Call(utils.SessionSv1AuthorizeEvent, authArgs, &authReply) if agReq.CGRReply, err = NewCGRReply(&authReply, err); err != nil { return @@ -279,7 +280,7 @@ func (da *DiameterAgent) processRequest(reqProcessor *config.DARequestProcessor, reqProcessor.Flags.HasKey(utils.MetaThresholds), reqProcessor.Flags.HasKey(utils.MetaStats), *cgrEv) var initReply sessions.V1InitSessionReply - err = da.sessionS.Call(utils.SessionSv1InitiateSession, + err = da.sS.Call(utils.SessionSv1InitiateSession, initArgs, &initReply) if agReq.CGRReply, err = NewCGRReply(&initReply, err); err != nil { return @@ -289,7 +290,7 @@ func (da *DiameterAgent) processRequest(reqProcessor *config.DARequestProcessor, reqProcessor.Flags.HasKey(utils.MetaAttributes), reqProcessor.Flags.HasKey(utils.MetaAccounts), *cgrEv) var updateReply sessions.V1UpdateSessionReply - err = da.sessionS.Call(utils.SessionSv1UpdateSession, + err = da.sS.Call(utils.SessionSv1UpdateSession, updateArgs, &updateReply) if agReq.CGRReply, err = NewCGRReply(&updateReply, err); err != nil { return @@ -301,7 +302,7 @@ func (da *DiameterAgent) processRequest(reqProcessor *config.DARequestProcessor, reqProcessor.Flags.HasKey(utils.MetaThresholds), reqProcessor.Flags.HasKey(utils.MetaStats), *cgrEv) var tRply string - err = da.sessionS.Call(utils.SessionSv1TerminateSession, + err = da.sS.Call(utils.SessionSv1TerminateSession, terminateArgs, &tRply) if agReq.CGRReply, err = NewCGRReply(nil, err); err != nil { return @@ -315,7 +316,7 @@ func (da *DiameterAgent) processRequest(reqProcessor *config.DARequestProcessor, reqProcessor.Flags.HasKey(utils.MetaStats), *cgrEv) var eventRply sessions.V1ProcessEventReply - err = da.sessionS.Call(utils.SessionSv1ProcessEvent, + err = da.sS.Call(utils.SessionSv1ProcessEvent, evArgs, &eventRply) if utils.ErrHasPrefix(err, utils.RalsErrorPrfx) { cgrEv.Event[utils.Usage] = 0 // avoid further debits @@ -331,7 +332,7 @@ func (da *DiameterAgent) processRequest(reqProcessor *config.DARequestProcessor, if reqProcessor.Flags.HasKey(utils.MetaCDRs) && !reqProcessor.Flags.HasKey(utils.MetaDryRun) { var rplyCDRs string - if err = da.sessionS.Call(utils.SessionSv1ProcessCDR, + if err = da.sS.Call(utils.SessionSv1ProcessCDR, cgrEv, &rplyCDRs); err != nil { agReq.CGRReply.Set([]string{utils.Error}, err.Error(), false, false) } @@ -353,3 +354,19 @@ func (da *DiameterAgent) processRequest(reqProcessor *config.DARequestProcessor, } return true, nil } + +// rpcclient.RpcClientConnection interface +func (da *DiameterAgent) Call(serviceMethod string, args interface{}, reply interface{}) error { + return utils.RPCCall(da, serviceMethod, args, reply) +} + +// V1DisconnectSession is part of the sessions.SessionSClient +func (da *DiameterAgent) V1DisconnectSession(args utils.AttrDisconnectSession, reply *string) error { + return utils.ErrNotImplemented +} + +// V1GetActiveSessionIDs is part of the sessions.SessionSClient +func (da *DiameterAgent) V1GetActiveSessionIDs(ignParam string, + sessionIDs *[]*sessions.SessionID) error { + return utils.ErrNotImplemented +} diff --git a/agents/diamagent_test.go b/agents/diamagent_test.go new file mode 100644 index 000000000..db1d741df --- /dev/null +++ b/agents/diamagent_test.go @@ -0,0 +1,28 @@ +/* +Real-time Online/Offline Charging System (OCS) for Telecom & ISP environments +Copyright (C) ITsysCOM GmbH + +This program is free software: you can redistribute it and/or modify +it under the terms of the GNU General Public License as published by +the Free Software Foundation, either version 3 of the License, or +(at your option) any later version. + +This program is distributed in the hope that it will be useful, +but WITHOUT ANY WARRANTY; without even the implied warranty of +MERCHANTABILITY or FITNESS FOR A PARTICULAR PURPOSE. See the +GNU General Public License for more details. + +You should have received a copy of the GNU General Public License +along with this program. If not, see +*/ +package agents + +import ( + "testing" + + "github.com/cgrates/cgrates/sessions" +) + +func TestDAsSessionSClientIface(t *testing.T) { + _ = sessions.SessionSClient(new(DiameterAgent)) +} diff --git a/cmd/cgr-engine/cgr-engine.go b/cmd/cgr-engine/cgr-engine.go index 5fa19269d..607d996dc 100644 --- a/cmd/cgr-engine/cgr-engine.go +++ b/cmd/cgr-engine/cgr-engine.go @@ -333,20 +333,33 @@ func startAsteriskAgent(internalSMGChan chan rpcclient.RpcClientConnection, exit exitChan <- true } -func startDiameterAgent(internalSMGChan chan rpcclient.RpcClientConnection, +func startDiameterAgent(internalSsChan chan rpcclient.RpcClientConnection, exitChan chan bool, filterSChan chan *engine.FilterS) { var err error utils.Logger.Info("Starting CGRateS DiameterAgent service") filterS := <-filterSChan filterSChan <- filterS - var smgConn *rpcclient.RpcClientPool - if len(cfg.DiameterAgentCfg().SessionSConns) != 0 { - smgConn, err = engine.NewRPCPool(rpcclient.POOL_FIRST, + var sS rpcclient.RpcClientConnection + var sSInternal bool + if len(cfg.DiameterAgentCfg().SessionSConns) == 0 { + utils.Logger.Crit( + fmt.Sprintf("<%s> no SessionS connections defined", + utils.DiameterAgent)) + exitChan <- true + return + } + if cfg.DiameterAgentCfg().SessionSConns[0].Address == utils.MetaInternal { + sSInternal = true + sSIntConn := <-internalSsChan + internalSsChan <- sSIntConn + sS = utils.NewBiRPCInternalClient(sSIntConn.(*sessions.SMGeneric)) + } else { + sS, err = engine.NewRPCPool(rpcclient.POOL_FIRST, cfg.TlsCfg().ClientKey, cfg.TlsCfg().ClientCerificate, cfg.TlsCfg().CaCertificate, cfg.GeneralCfg().ConnectAttempts, cfg.GeneralCfg().Reconnects, cfg.GeneralCfg().ConnectTimeout, cfg.GeneralCfg().ReplyTimeout, - cfg.DiameterAgentCfg().SessionSConns, internalSMGChan, + cfg.DiameterAgentCfg().SessionSConns, internalSsChan, cfg.GeneralCfg().InternalTtl) if err != nil { utils.Logger.Crit(fmt.Sprintf("<%s> Could not connect to %s: %s", @@ -355,12 +368,15 @@ func startDiameterAgent(internalSMGChan chan rpcclient.RpcClientConnection, return } } - da, err := agents.NewDiameterAgent(cfg, filterS, smgConn) + da, err := agents.NewDiameterAgent(cfg, filterS, sS) if err != nil { utils.Logger.Err(fmt.Sprintf(" error: %s!", err)) exitChan <- true return } + if sSInternal { // bidirectional client backwards connection + sS.(*utils.BiRPCInternalClient).SetClientConn(da) + } if err = da.ListenAndServe(); err != nil { utils.Logger.Err(fmt.Sprintf(" error: %s!", err)) } diff --git a/config/config.go b/config/config.go index 4bc83262a..b3d75d8a1 100755 --- a/config/config.go +++ b/config/config.go @@ -605,11 +605,13 @@ func (self *CGRConfig) checkConfigSanity() error { } } // DAgent checks - if self.diameterAgentCfg.Enabled && !self.sessionSCfg.Enabled { - for _, daSMGConn := range self.diameterAgentCfg.SessionSConns { - if daSMGConn.Address == utils.MetaInternal { - return fmt.Errorf("%s not enabled but referenced by %s component", - utils.SessionS, utils.DiameterAgent) + if self.diameterAgentCfg.Enabled { + if !self.sessionSCfg.Enabled { + for _, daSMGConn := range self.diameterAgentCfg.SessionSConns { + if daSMGConn.Address == utils.MetaInternal { + return fmt.Errorf("%s not enabled but referenced by %s component", + utils.SessionS, utils.DiameterAgent) + } } } }